This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ad4ae791204 branch-3.1: [opt](object client) Print requestId when s3
request failed #54066 (#54511)
ad4ae791204 is described below
commit ad4ae791204449e567e4de508d55e010ee057dcf
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 12:02:13 2025 +0800
branch-3.1: [opt](object client) Print requestId when s3 request failed
#54066 (#54511)
Cherry-picked from #54066
Co-authored-by: Lei Zhang <[email protected]>
---
be/src/io/fs/s3_obj_storage_client.cpp | 197 +++++++++++++++++++++------------
cloud/src/recycler/s3_obj_client.cpp | 40 +++----
2 files changed, 144 insertions(+), 93 deletions(-)
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp
b/be/src/io/fs/s3_obj_storage_client.cpp
index c6cd48f8386..e9f81da5e90 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -114,30 +114,43 @@ using Aws::S3::Model::UploadPartOutcome;
namespace doris::io {
using namespace Aws::S3::Model;
+static constexpr int S3_REQUEST_THRESHOLD_MS = 5000;
+
ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload(
const ObjectStoragePathOptions& opts) {
- CreateMultipartUploadRequest create_request;
- create_request.WithBucket(opts.bucket).WithKey(opts.key);
- create_request.SetContentType("application/octet-stream");
+ CreateMultipartUploadRequest request;
+ request.WithBucket(opts.bucket).WithKey(opts.key);
+ request.SetContentType("application/octet-stream");
+
+ MonotonicStopWatch watch;
+ watch.start();
- SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_put_rate_limit([&]() { return
_client->CreateMultipartUpload(create_request); }),
- "s3_file_writer::create_multi_part_upload",
std::cref(create_request).get());
+ s3_put_rate_limit([&]() { return
_client->CreateMultipartUpload(request); }),
+ "s3_file_writer::create_multi_part_upload",
std::cref(request).get());
SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
+ watch.stop();
- if (outcome.IsSuccess()) {
- return ObjectStorageUploadResponse {.upload_id
{outcome.GetResult().GetUploadId()}};
+ s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
+ const auto& request_id = outcome.IsSuccess() ?
outcome.GetResult().GetRequestId()
+ :
outcome.GetError().GetRequestId();
+
+ LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+ << "CreateMultipartUpload cost=" <<
watch.elapsed_time_milliseconds() << "ms"
+ << ", request_id=" << request_id << ", bucket=" << opts.bucket <<
", key=" << opts.key;
+
+ if (!outcome.IsSuccess()) {
+ auto st = s3fs_error(outcome.GetError(), fmt::format("failed to
CreateMultipartUpload: {} ",
+
opts.path.native()));
+ LOG(WARNING) << st << " request_id=" << request_id;
+ return ObjectStorageUploadResponse {
+ .resp = {convert_to_obj_response(std::move(st)),
+
static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()},
+ };
}
- return ObjectStorageUploadResponse {
- .resp = {convert_to_obj_response(
- s3fs_error(outcome.GetError(),
- fmt::format("failed to create
multipart upload {} ",
- opts.path.native()))),
- static_cast<int>(outcome.GetError().GetResponseCode()),
- outcome.GetError().GetRequestId()},
- };
+ return ObjectStorageUploadResponse {.upload_id
{outcome.GetResult().GetUploadId()}};
}
ObjectStorageResponse S3ObjStorageClient::put_object(const
ObjectStoragePathOptions& opts,
@@ -150,68 +163,91 @@ ObjectStorageResponse
S3ObjStorageClient::put_object(const ObjectStoragePathOpti
request.SetBody(string_view_stream);
request.SetContentLength(stream.size());
request.SetContentType("application/octet-stream");
- SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
- auto response = SYNC_POINT_HOOK_RETURN_VALUE(
+
+ MonotonicStopWatch watch;
+ watch.start();
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
s3_put_rate_limit([&]() { return _client->PutObject(request); }),
"s3_file_writer::put_object", std::cref(request).get(), &stream);
- if (!response.IsSuccess()) {
- auto st = s3fs_error(response.GetError(),
- fmt::format("failed to put object {}",
opts.path.native()));
- LOG(WARNING) << st;
+
+ watch.stop();
+
+ s3_bvar::s3_put_latency << watch.elapsed_time_microseconds();
+ const auto& request_id = outcome.IsSuccess() ?
outcome.GetResult().GetRequestId()
+ :
outcome.GetError().GetRequestId();
+
+ if (!outcome.IsSuccess()) {
+ auto st = s3fs_error(outcome.GetError(),
+ fmt::format("failed to put object: {}",
opts.path.native()));
+ LOG(WARNING) << st << ", request_id=" << request_id;
return ObjectStorageResponse {convert_to_obj_response(std::move(st)),
-
static_cast<int>(response.GetError().GetResponseCode()),
- response.GetError().GetRequestId()};
+
static_cast<int>(outcome.GetError().GetResponseCode()),
+ request_id};
}
+
+ LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+ << "PutObject cost=" << watch.elapsed_time_milliseconds() << "ms"
+ << ", request_id=" << request_id << ", bucket=" << opts.bucket <<
", key=" << opts.key;
return ObjectStorageResponse::OK();
}
ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const
ObjectStoragePathOptions& opts,
std::string_view
stream, int part_num) {
- UploadPartRequest upload_request;
- upload_request.WithBucket(opts.bucket)
+ UploadPartRequest request;
+ request.WithBucket(opts.bucket)
.WithKey(opts.key)
.WithPartNumber(part_num)
.WithUploadId(*opts.upload_id);
auto string_view_stream =
std::make_shared<StringViewStream>(stream.data(), stream.size());
- upload_request.SetBody(string_view_stream);
+ request.SetBody(string_view_stream);
Aws::Utils::ByteBuffer
part_md5(Aws::Utils::HashingUtils::CalculateMD5(*string_view_stream));
-
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+ request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+ request.SetContentLength(stream.size());
+ request.SetContentType("application/octet-stream");
+
+ MonotonicStopWatch watch;
+ watch.start();
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_put_rate_limit([&]() { return _client->UploadPart(request); }),
+ "s3_file_writer::upload_part", std::cref(request).get(), &stream);
- upload_request.SetContentLength(stream.size());
- upload_request.SetContentType("application/octet-stream");
+ watch.stop();
- UploadPartOutcome upload_part_outcome;
- {
- SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
- upload_part_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_put_rate_limit([&]() { return
_client->UploadPart(upload_request); }),
- "s3_file_writer::upload_part",
std::cref(upload_request).get(), &stream);
- }
- TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part",
&upload_part_outcome);
- if (!upload_part_outcome.IsSuccess()) {
- auto s = Status::IOError(
- "failed to upload part (bucket={}, key={}, part_num={},
up_load_id={}): {}, "
- "exception {}, error code {}",
+ s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
+ const auto& request_id = outcome.IsSuccess() ?
outcome.GetResult().GetRequestId()
+ :
outcome.GetError().GetRequestId();
+
+ TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", &outcome);
+ if (!outcome.IsSuccess()) {
+ auto st = Status::IOError(
+ "failed to UploadPart bucket={}, key={}, part_num={},
upload_id={}, message={}, "
+ "exception_name={}, response_code={}, request_id={}",
opts.bucket, opts.path.native(), part_num, *opts.upload_id,
- upload_part_outcome.GetError().GetMessage(),
- upload_part_outcome.GetError().GetExceptionName(),
- upload_part_outcome.GetError().GetResponseCode());
- LOG_WARNING(s.to_string());
+ outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(),
+ outcome.GetError().GetResponseCode(), request_id);
+
+ LOG(WARNING) << st << ", request_id=" << request_id;
return ObjectStorageUploadResponse {
- .resp = {convert_to_obj_response(std::move(s)),
-
static_cast<int>(upload_part_outcome.GetError().GetResponseCode()),
- upload_part_outcome.GetError().GetRequestId()}};
+ .resp = {convert_to_obj_response(std::move(st)),
+
static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()}};
}
- return ObjectStorageUploadResponse {.etag =
upload_part_outcome.GetResult().GetETag()};
+
+ LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+ << "UploadPart cost=" << watch.elapsed_time_milliseconds() << "ms"
+ << ", request_id=" << request_id << ", bucket=" << opts.bucket <<
", key=" << opts.key
+ << ", part_num=" << part_num << ", upload_id=" << *opts.upload_id;
+ return ObjectStorageUploadResponse {.etag = outcome.GetResult().GetETag()};
}
ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
const ObjectStoragePathOptions& opts,
const std::vector<ObjectCompleteMultiPart>& completed_parts) {
- CompleteMultipartUploadRequest complete_request;
-
complete_request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
+ CompleteMultipartUploadRequest request;
+
request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
CompletedMultipartUpload completed_upload;
std::vector<CompletedPart> complete_parts;
@@ -223,23 +259,35 @@ ObjectStorageResponse
S3ObjStorageClient::complete_multipart_upload(
return part;
});
completed_upload.SetParts(std::move(complete_parts));
- complete_request.WithMultipartUpload(completed_upload);
+ request.WithMultipartUpload(completed_upload);
TEST_SYNC_POINT_RETURN_WITH_VALUE("S3FileWriter::_complete:3",
ObjectStorageResponse(), this);
- SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
- auto complete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_put_rate_limit([&]() { return
_client->CompleteMultipartUpload(complete_request); }),
- "s3_file_writer::complete_multi_part",
std::cref(complete_request).get());
-
- if (!complete_outcome.IsSuccess()) {
- auto st = s3fs_error(complete_outcome.GetError(),
- fmt::format("failed to complete multi part upload
{}, upload_id={}",
+
+ MonotonicStopWatch watch;
+ watch.start();
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_put_rate_limit([&]() { return
_client->CompleteMultipartUpload(request); }),
+ "s3_file_writer::complete_multi_part", std::cref(request).get());
+
+ watch.stop();
+ s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
+ const auto& request_id = outcome.IsSuccess() ?
outcome.GetResult().GetRequestId()
+ :
outcome.GetError().GetRequestId();
+
+ if (!outcome.IsSuccess()) {
+ auto st = s3fs_error(outcome.GetError(),
+ fmt::format("failed to CompleteMultipartUpload:
{}, upload_id={}",
opts.path.native(), *opts.upload_id));
- LOG(WARNING) << st;
+ LOG(WARNING) << st << ", request_id=" << request_id;
return {convert_to_obj_response(std::move(st)),
-
static_cast<int>(complete_outcome.GetError().GetResponseCode()),
- complete_outcome.GetError().GetRequestId()};
+ static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()};
}
+
+ LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
+ << "CompleteMultipartUpload cost=" <<
watch.elapsed_time_milliseconds() << "ms"
+ << ", request_id=" << request_id << ", bucket=" << opts.bucket <<
", key=" << opts.key
+ << ", upload_id=" << *opts.upload_id;
return ObjectStorageResponse::OK();
}
@@ -287,9 +335,9 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const
ObjectStoragePathOpti
}
*size_return = outcome.GetResult().GetContentLength();
if (*size_return != bytes_read) {
- return {convert_to_obj_response(
- Status::InternalError("failed to read from {}(bytes read: {},
bytes req: {})",
- opts.path.native(), *size_return,
bytes_read))};
+ return {convert_to_obj_response(Status::InternalError(
+ "failed to read from {}(bytes read: {}, bytes req: {}),
request_id: {}",
+ opts.path.native(), *size_return, bytes_read,
outcome.GetResult().GetRequestId()))};
}
return ObjectStorageResponse::OK();
}
@@ -323,9 +371,10 @@ ObjectStorageResponse
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
}
is_trucated = outcome.GetResult().GetIsTruncated();
if (is_trucated &&
outcome.GetResult().GetNextContinuationToken().empty()) {
- return {convert_to_obj_response(Status::InternalError(
- "failed to list {}, is_trucated is true, but next
continuation token is empty",
- opts.prefix))};
+ return {convert_to_obj_response(
+ Status::InternalError("failed to list {}, is_trucated is
true, but next "
+ "continuation token is empty,
request_id={}",
+ opts.prefix,
outcome.GetResult().GetRequestId()))};
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
@@ -358,8 +407,9 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects(const ObjectStoragePath
}
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
- return {convert_to_obj_response(Status::InternalError("failed to
delete object {}: {}",
- e.GetKey(),
e.GetMessage()))};
+ return {convert_to_obj_response(
+ Status::InternalError("failed to delete object {}: {},
request_id={}", e.GetKey(),
+ e.GetMessage(),
delete_outcome.GetResult().GetRequestId()))};
}
return ObjectStorageResponse::OK();
}
@@ -423,7 +473,8 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects_recursively(
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
return {convert_to_obj_response(Status::InternalError(
- "failed to delete object {}: {}", opts.key,
e.GetMessage()))};
+ "failed to delete object {}: {}, request_id={}",
opts.key, e.GetMessage(),
+ delete_outcome.GetResult().GetRequestId()))};
}
}
is_trucated = result.GetIsTruncated();
diff --git a/cloud/src/recycler/s3_obj_client.cpp
b/cloud/src/recycler/s3_obj_client.cpp
index a5a8977e17b..fe4c368ceda 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -96,13 +96,16 @@ public:
return client_->ListObjectsV2(req_);
});
+ const auto& request_id = outcome.IsSuccess() ?
outcome.GetResult().GetRequestId()
+ :
outcome.GetError().GetRequestId();
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", endpoint_)
.tag("bucket", req_.GetBucket())
.tag("prefix", req_.GetPrefix())
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("request_id", request_id);
is_valid_ = false;
return false;
}
@@ -112,7 +115,8 @@ public:
LOG_WARNING("failed to list objects, isTruncated but no
continuation token")
.tag("endpoint", endpoint_)
.tag("bucket", req_.GetBucket())
- .tag("prefix", req_.GetPrefix());
+ .tag("prefix", req_.GetPrefix())
+ .tag("request_id", request_id);
is_valid_ = false;
return false;
@@ -123,7 +127,8 @@ public:
const_cast<std::string&&>(outcome.GetResult().GetNextContinuationToken())));
auto&& content = outcome.GetResult().GetContents();
- DCHECK(!(has_more_ && content.empty())) << has_more_ << ' ' <<
content.empty();
+ DCHECK(!(has_more_ && content.empty()))
+ << has_more_ << ' ' << content.empty() << " request_id=" <<
request_id;
results_.reserve(content.size());
for (auto&& obj : std::ranges::reverse_view(content)) {
@@ -178,7 +183,8 @@ ObjectStorageResponse
S3ObjClient::put_object(ObjectStoragePathRef path, std::st
.tag("bucket", path.bucket)
.tag("key", path.key)
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("request_id", outcome.GetError().GetRequestId());
return -1;
}
return 0;
@@ -204,7 +210,8 @@ ObjectStorageResponse
S3ObjClient::head_object(ObjectStoragePathRef path, Object
.tag("bucket", path.bucket)
.tag("key", path.key)
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("request_id", outcome.GetError().GetRequestId());
return -1;
}
}
@@ -243,18 +250,8 @@ ObjectStorageResponse S3ObjClient::delete_objects(const
std::string& bucket,
.tag("key[0]",
delete_request.GetDelete().GetObjects().front().GetKey())
.tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
- .tag("error", delete_outcome.GetError().GetMessage());
- return -1;
- }
-
- if (!delete_outcome.IsSuccess()) {
- LOG_WARNING("failed to delete objects")
- .tag("endpoint", endpoint_)
- .tag("bucket", bucket)
- .tag("key[0]",
delete_request.GetDelete().GetObjects().front().GetKey())
- .tag("responseCode",
-
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
- .tag("error", delete_outcome.GetError().GetMessage());
+ .tag("error", delete_outcome.GetError().GetMessage())
+ .tag("request_id",
delete_outcome.GetError().GetRequestId());
return -1;
}
@@ -303,7 +300,8 @@ ObjectStorageResponse
S3ObjClient::delete_object(ObjectStoragePathRef path) {
.tag("key", path.key)
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage())
- .tag("exception", outcome.GetError().GetExceptionName());
+ .tag("exception", outcome.GetError().GetExceptionName())
+ .tag("request_id", outcome.GetError().GetRequestId());
if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
return {ObjectStorageResponse::NOT_FOUND,
outcome.GetError().GetMessage()};
}
@@ -339,7 +337,8 @@ ObjectStorageResponse S3ObjClient::get_life_cycle(const
std::string& bucket,
.tag("endpoint", endpoint_)
.tag("bucket", bucket)
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("request_id", outcome.GetError().GetRequestId());
return -1;
}
@@ -370,7 +369,8 @@ ObjectStorageResponse S3ObjClient::check_versioning(const
std::string& bucket) {
.tag("endpoint", endpoint_)
.tag("bucket", bucket)
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("request_id", outcome.GetError().GetRequestId());
return -1;
}
return 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]