This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ad4ae791204 branch-3.1: [opt](object client) Print requestId when s3 
request failed #54066 (#54511)
ad4ae791204 is described below

commit ad4ae791204449e567e4de508d55e010ee057dcf
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 12:02:13 2025 +0800

    branch-3.1: [opt](object client) Print requestId when s3 request failed 
#54066 (#54511)
    
    Cherry-picked from #54066
    
    Co-authored-by: Lei Zhang <[email protected]>
---
 be/src/io/fs/s3_obj_storage_client.cpp | 197 +++++++++++++++++++++------------
 cloud/src/recycler/s3_obj_client.cpp   |  40 +++----
 2 files changed, 144 insertions(+), 93 deletions(-)

diff --git a/be/src/io/fs/s3_obj_storage_client.cpp 
b/be/src/io/fs/s3_obj_storage_client.cpp
index c6cd48f8386..e9f81da5e90 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -114,30 +114,43 @@ using Aws::S3::Model::UploadPartOutcome;
 namespace doris::io {
 using namespace Aws::S3::Model;
 
+static constexpr int S3_REQUEST_THRESHOLD_MS = 5000;
+
 ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload(
         const ObjectStoragePathOptions& opts) {
-    CreateMultipartUploadRequest create_request;
-    create_request.WithBucket(opts.bucket).WithKey(opts.key);
-    create_request.SetContentType("application/octet-stream");
+    CreateMultipartUploadRequest request;
+    request.WithBucket(opts.bucket).WithKey(opts.key);
+    request.SetContentType("application/octet-stream");
+
+    MonotonicStopWatch watch;
+    watch.start();
 
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
     auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-            s3_put_rate_limit([&]() { return 
_client->CreateMultipartUpload(create_request); }),
-            "s3_file_writer::create_multi_part_upload", 
std::cref(create_request).get());
+            s3_put_rate_limit([&]() { return 
_client->CreateMultipartUpload(request); }),
+            "s3_file_writer::create_multi_part_upload", 
std::cref(request).get());
     SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
+    watch.stop();
 
-    if (outcome.IsSuccess()) {
-        return ObjectStorageUploadResponse {.upload_id 
{outcome.GetResult().GetUploadId()}};
+    s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
+    const auto& request_id = outcome.IsSuccess() ? 
outcome.GetResult().GetRequestId()
+                                                 : 
outcome.GetError().GetRequestId();
+
+    LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+            << "CreateMultipartUpload cost=" << 
watch.elapsed_time_milliseconds() << "ms"
+            << ", request_id=" << request_id << ", bucket=" << opts.bucket << 
", key=" << opts.key;
+
+    if (!outcome.IsSuccess()) {
+        auto st = s3fs_error(outcome.GetError(), fmt::format("failed to 
CreateMultipartUpload: {} ",
+                                                             
opts.path.native()));
+        LOG(WARNING) << st << " request_id=" << request_id;
+        return ObjectStorageUploadResponse {
+                .resp = {convert_to_obj_response(std::move(st)),
+                         
static_cast<int>(outcome.GetError().GetResponseCode()),
+                         outcome.GetError().GetRequestId()},
+        };
     }
 
-    return ObjectStorageUploadResponse {
-            .resp = {convert_to_obj_response(
-                             s3fs_error(outcome.GetError(),
-                                        fmt::format("failed to create 
multipart upload {} ",
-                                                    opts.path.native()))),
-                     static_cast<int>(outcome.GetError().GetResponseCode()),
-                     outcome.GetError().GetRequestId()},
-    };
+    return ObjectStorageUploadResponse {.upload_id 
{outcome.GetResult().GetUploadId()}};
 }
 
 ObjectStorageResponse S3ObjStorageClient::put_object(const 
ObjectStoragePathOptions& opts,
@@ -150,68 +163,91 @@ ObjectStorageResponse 
S3ObjStorageClient::put_object(const ObjectStoragePathOpti
     request.SetBody(string_view_stream);
     request.SetContentLength(stream.size());
     request.SetContentType("application/octet-stream");
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
-    auto response = SYNC_POINT_HOOK_RETURN_VALUE(
+
+    MonotonicStopWatch watch;
+    watch.start();
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
             s3_put_rate_limit([&]() { return _client->PutObject(request); }),
             "s3_file_writer::put_object", std::cref(request).get(), &stream);
-    if (!response.IsSuccess()) {
-        auto st = s3fs_error(response.GetError(),
-                             fmt::format("failed to put object {}", 
opts.path.native()));
-        LOG(WARNING) << st;
+
+    watch.stop();
+
+    s3_bvar::s3_put_latency << watch.elapsed_time_microseconds();
+    const auto& request_id = outcome.IsSuccess() ? 
outcome.GetResult().GetRequestId()
+                                                 : 
outcome.GetError().GetRequestId();
+
+    if (!outcome.IsSuccess()) {
+        auto st = s3fs_error(outcome.GetError(),
+                             fmt::format("failed to put object: {}", 
opts.path.native()));
+        LOG(WARNING) << st << ", request_id=" << request_id;
         return ObjectStorageResponse {convert_to_obj_response(std::move(st)),
-                                      
static_cast<int>(response.GetError().GetResponseCode()),
-                                      response.GetError().GetRequestId()};
+                                      
static_cast<int>(outcome.GetError().GetResponseCode()),
+                                      request_id};
     }
+
+    LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+            << "PutObject cost=" << watch.elapsed_time_milliseconds() << "ms"
+            << ", request_id=" << request_id << ", bucket=" << opts.bucket << 
", key=" << opts.key;
     return ObjectStorageResponse::OK();
 }
 
 ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const 
ObjectStoragePathOptions& opts,
                                                             std::string_view 
stream, int part_num) {
-    UploadPartRequest upload_request;
-    upload_request.WithBucket(opts.bucket)
+    UploadPartRequest request;
+    request.WithBucket(opts.bucket)
             .WithKey(opts.key)
             .WithPartNumber(part_num)
             .WithUploadId(*opts.upload_id);
     auto string_view_stream = 
std::make_shared<StringViewStream>(stream.data(), stream.size());
 
-    upload_request.SetBody(string_view_stream);
+    request.SetBody(string_view_stream);
 
     Aws::Utils::ByteBuffer 
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*string_view_stream));
-    
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+    request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+    request.SetContentLength(stream.size());
+    request.SetContentType("application/octet-stream");
+
+    MonotonicStopWatch watch;
+    watch.start();
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_put_rate_limit([&]() { return _client->UploadPart(request); }),
+            "s3_file_writer::upload_part", std::cref(request).get(), &stream);
 
-    upload_request.SetContentLength(stream.size());
-    upload_request.SetContentType("application/octet-stream");
+    watch.stop();
 
-    UploadPartOutcome upload_part_outcome;
-    {
-        SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
-        upload_part_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                s3_put_rate_limit([&]() { return 
_client->UploadPart(upload_request); }),
-                "s3_file_writer::upload_part", 
std::cref(upload_request).get(), &stream);
-    }
-    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", 
&upload_part_outcome);
-    if (!upload_part_outcome.IsSuccess()) {
-        auto s = Status::IOError(
-                "failed to upload part (bucket={}, key={}, part_num={}, 
up_load_id={}): {}, "
-                "exception {}, error code {}",
+    s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
+    const auto& request_id = outcome.IsSuccess() ? 
outcome.GetResult().GetRequestId()
+                                                 : 
outcome.GetError().GetRequestId();
+
+    TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", &outcome);
+    if (!outcome.IsSuccess()) {
+        auto st = Status::IOError(
+                "failed to UploadPart bucket={}, key={}, part_num={}, 
upload_id={}, message={}, "
+                "exception_name={}, response_code={}, request_id={}",
                 opts.bucket, opts.path.native(), part_num, *opts.upload_id,
-                upload_part_outcome.GetError().GetMessage(),
-                upload_part_outcome.GetError().GetExceptionName(),
-                upload_part_outcome.GetError().GetResponseCode());
-        LOG_WARNING(s.to_string());
+                outcome.GetError().GetMessage(), 
outcome.GetError().GetExceptionName(),
+                outcome.GetError().GetResponseCode(), request_id);
+
+        LOG(WARNING) << st << ", request_id=" << request_id;
         return ObjectStorageUploadResponse {
-                .resp = {convert_to_obj_response(std::move(s)),
-                         
static_cast<int>(upload_part_outcome.GetError().GetResponseCode()),
-                         upload_part_outcome.GetError().GetRequestId()}};
+                .resp = {convert_to_obj_response(std::move(st)),
+                         
static_cast<int>(outcome.GetError().GetResponseCode()),
+                         outcome.GetError().GetRequestId()}};
     }
-    return ObjectStorageUploadResponse {.etag = 
upload_part_outcome.GetResult().GetETag()};
+
+    LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+            << "UploadPart cost=" << watch.elapsed_time_milliseconds() << "ms"
+            << ", request_id=" << request_id << ", bucket=" << opts.bucket << 
", key=" << opts.key
+            << ", part_num=" << part_num << ", upload_id=" << *opts.upload_id;
+    return ObjectStorageUploadResponse {.etag = outcome.GetResult().GetETag()};
 }
 
 ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
         const ObjectStoragePathOptions& opts,
         const std::vector<ObjectCompleteMultiPart>& completed_parts) {
-    CompleteMultipartUploadRequest complete_request;
-    
complete_request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
+    CompleteMultipartUploadRequest request;
+    
request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
 
     CompletedMultipartUpload completed_upload;
     std::vector<CompletedPart> complete_parts;
@@ -223,23 +259,35 @@ ObjectStorageResponse 
S3ObjStorageClient::complete_multipart_upload(
                                return part;
                            });
     completed_upload.SetParts(std::move(complete_parts));
-    complete_request.WithMultipartUpload(completed_upload);
+    request.WithMultipartUpload(completed_upload);
 
     TEST_SYNC_POINT_RETURN_WITH_VALUE("S3FileWriter::_complete:3", 
ObjectStorageResponse(), this);
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
-    auto complete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-            s3_put_rate_limit([&]() { return 
_client->CompleteMultipartUpload(complete_request); }),
-            "s3_file_writer::complete_multi_part", 
std::cref(complete_request).get());
-
-    if (!complete_outcome.IsSuccess()) {
-        auto st = s3fs_error(complete_outcome.GetError(),
-                             fmt::format("failed to complete multi part upload 
{}, upload_id={}",
+
+    MonotonicStopWatch watch;
+    watch.start();
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_put_rate_limit([&]() { return 
_client->CompleteMultipartUpload(request); }),
+            "s3_file_writer::complete_multi_part", std::cref(request).get());
+
+    watch.stop();
+    s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
+    const auto& request_id = outcome.IsSuccess() ? 
outcome.GetResult().GetRequestId()
+                                                 : 
outcome.GetError().GetRequestId();
+
+    if (!outcome.IsSuccess()) {
+        auto st = s3fs_error(outcome.GetError(),
+                             fmt::format("failed to CompleteMultipartUpload: 
{}, upload_id={}",
                                          opts.path.native(), *opts.upload_id));
-        LOG(WARNING) << st;
+        LOG(WARNING) << st << ", request_id=" << request_id;
         return {convert_to_obj_response(std::move(st)),
-                
static_cast<int>(complete_outcome.GetError().GetResponseCode()),
-                complete_outcome.GetError().GetRequestId()};
+                static_cast<int>(outcome.GetError().GetResponseCode()),
+                outcome.GetError().GetRequestId()};
     }
+
+    LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+            << "CompleteMultipartUpload cost=" << 
watch.elapsed_time_milliseconds() << "ms"
+            << ", request_id=" << request_id << ", bucket=" << opts.bucket << 
", key=" << opts.key
+            << ", upload_id=" << *opts.upload_id;
     return ObjectStorageResponse::OK();
 }
 
@@ -287,9 +335,9 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const 
ObjectStoragePathOpti
     }
     *size_return = outcome.GetResult().GetContentLength();
     if (*size_return != bytes_read) {
-        return {convert_to_obj_response(
-                Status::InternalError("failed to read from {}(bytes read: {}, 
bytes req: {})",
-                                      opts.path.native(), *size_return, 
bytes_read))};
+        return {convert_to_obj_response(Status::InternalError(
+                "failed to read from {}(bytes read: {}, bytes req: {}), 
request_id: {}",
+                opts.path.native(), *size_return, bytes_read, 
outcome.GetResult().GetRequestId()))};
     }
     return ObjectStorageResponse::OK();
 }
@@ -323,9 +371,10 @@ ObjectStorageResponse 
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
         }
         is_trucated = outcome.GetResult().GetIsTruncated();
         if (is_trucated && 
outcome.GetResult().GetNextContinuationToken().empty()) {
-            return {convert_to_obj_response(Status::InternalError(
-                    "failed to list {}, is_trucated is true, but next 
continuation token is empty",
-                    opts.prefix))};
+            return {convert_to_obj_response(
+                    Status::InternalError("failed to list {}, is_trucated is 
true, but next "
+                                          "continuation token is empty, 
request_id={}",
+                                          opts.prefix, 
outcome.GetResult().GetRequestId()))};
         }
 
         
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
@@ -358,8 +407,9 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects(const ObjectStoragePath
     }
     if (!delete_outcome.GetResult().GetErrors().empty()) {
         const auto& e = delete_outcome.GetResult().GetErrors().front();
-        return {convert_to_obj_response(Status::InternalError("failed to 
delete object {}: {}",
-                                                              e.GetKey(), 
e.GetMessage()))};
+        return {convert_to_obj_response(
+                Status::InternalError("failed to delete object {}: {}, 
request_id={}", e.GetKey(),
+                                      e.GetMessage(), 
delete_outcome.GetResult().GetRequestId()))};
     }
     return ObjectStorageResponse::OK();
 }
@@ -423,7 +473,8 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
             if (!delete_outcome.GetResult().GetErrors().empty()) {
                 const auto& e = delete_outcome.GetResult().GetErrors().front();
                 return {convert_to_obj_response(Status::InternalError(
-                        "failed to delete object {}: {}", opts.key, 
e.GetMessage()))};
+                        "failed to delete object {}: {}, request_id={}", 
opts.key, e.GetMessage(),
+                        delete_outcome.GetResult().GetRequestId()))};
             }
         }
         is_trucated = result.GetIsTruncated();
diff --git a/cloud/src/recycler/s3_obj_client.cpp 
b/cloud/src/recycler/s3_obj_client.cpp
index a5a8977e17b..fe4c368ceda 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -96,13 +96,16 @@ public:
             return client_->ListObjectsV2(req_);
         });
 
+        const auto& request_id = outcome.IsSuccess() ? 
outcome.GetResult().GetRequestId()
+                                                     : 
outcome.GetError().GetRequestId();
         if (!outcome.IsSuccess()) {
             LOG_WARNING("failed to list objects")
                     .tag("endpoint", endpoint_)
                     .tag("bucket", req_.GetBucket())
                     .tag("prefix", req_.GetPrefix())
                     .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                    .tag("error", outcome.GetError().GetMessage());
+                    .tag("error", outcome.GetError().GetMessage())
+                    .tag("request_id", request_id);
             is_valid_ = false;
             return false;
         }
@@ -112,7 +115,8 @@ public:
             LOG_WARNING("failed to list objects, isTruncated but no 
continuation token")
                     .tag("endpoint", endpoint_)
                     .tag("bucket", req_.GetBucket())
-                    .tag("prefix", req_.GetPrefix());
+                    .tag("prefix", req_.GetPrefix())
+                    .tag("request_id", request_id);
 
             is_valid_ = false;
             return false;
@@ -123,7 +127,8 @@ public:
                 
const_cast<std::string&&>(outcome.GetResult().GetNextContinuationToken())));
 
         auto&& content = outcome.GetResult().GetContents();
-        DCHECK(!(has_more_ && content.empty())) << has_more_ << ' ' << 
content.empty();
+        DCHECK(!(has_more_ && content.empty()))
+                << has_more_ << ' ' << content.empty() << " request_id=" << 
request_id;
 
         results_.reserve(content.size());
         for (auto&& obj : std::ranges::reverse_view(content)) {
@@ -178,7 +183,8 @@ ObjectStorageResponse 
S3ObjClient::put_object(ObjectStoragePathRef path, std::st
                 .tag("bucket", path.bucket)
                 .tag("key", path.key)
                 .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
+                .tag("error", outcome.GetError().GetMessage())
+                .tag("request_id", outcome.GetError().GetRequestId());
         return -1;
     }
     return 0;
@@ -204,7 +210,8 @@ ObjectStorageResponse 
S3ObjClient::head_object(ObjectStoragePathRef path, Object
                 .tag("bucket", path.bucket)
                 .tag("key", path.key)
                 .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
+                .tag("error", outcome.GetError().GetMessage())
+                .tag("request_id", outcome.GetError().GetRequestId());
         return -1;
     }
 }
@@ -243,18 +250,8 @@ ObjectStorageResponse S3ObjClient::delete_objects(const 
std::string& bucket,
                     .tag("key[0]", 
delete_request.GetDelete().GetObjects().front().GetKey())
                     .tag("responseCode",
                          
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
-                    .tag("error", delete_outcome.GetError().GetMessage());
-            return -1;
-        }
-
-        if (!delete_outcome.IsSuccess()) {
-            LOG_WARNING("failed to delete objects")
-                    .tag("endpoint", endpoint_)
-                    .tag("bucket", bucket)
-                    .tag("key[0]", 
delete_request.GetDelete().GetObjects().front().GetKey())
-                    .tag("responseCode",
-                         
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
-                    .tag("error", delete_outcome.GetError().GetMessage());
+                    .tag("error", delete_outcome.GetError().GetMessage())
+                    .tag("request_id", 
delete_outcome.GetError().GetRequestId());
             return -1;
         }
 
@@ -303,7 +300,8 @@ ObjectStorageResponse 
S3ObjClient::delete_object(ObjectStoragePathRef path) {
                 .tag("key", path.key)
                 .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
                 .tag("error", outcome.GetError().GetMessage())
-                .tag("exception", outcome.GetError().GetExceptionName());
+                .tag("exception", outcome.GetError().GetExceptionName())
+                .tag("request_id", outcome.GetError().GetRequestId());
         if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
             return {ObjectStorageResponse::NOT_FOUND, 
outcome.GetError().GetMessage()};
         }
@@ -339,7 +337,8 @@ ObjectStorageResponse S3ObjClient::get_life_cycle(const 
std::string& bucket,
                 .tag("endpoint", endpoint_)
                 .tag("bucket", bucket)
                 .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
+                .tag("error", outcome.GetError().GetMessage())
+                .tag("request_id", outcome.GetError().GetRequestId());
         return -1;
     }
 
@@ -370,7 +369,8 @@ ObjectStorageResponse S3ObjClient::check_versioning(const 
std::string& bucket) {
                 .tag("endpoint", endpoint_)
                 .tag("bucket", bucket)
                 .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
+                .tag("error", outcome.GetError().GetMessage())
+                .tag("request_id", outcome.GetError().GetRequestId());
         return -1;
     }
     return 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to