This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 31d8df8de28 (enhance)(S3) Change s3 metric from bvar adder to latency
recorder (#28861)
31d8df8de28 is described below
commit 31d8df8de28250058e9b9e4f9cafb5496994e259
Author: AlexYue <[email protected]>
AuthorDate: Mon Feb 19 15:05:05 2024 +0800
(enhance)(S3) Change s3 metric from bvar adder to latency recorder (#28861)
---
be/src/io/fs/err_utils.cpp | 15 ++++++++-------
be/src/io/fs/s3_file_reader.cpp | 3 ++-
be/src/io/fs/s3_file_system.cpp | 32 +++++++++++++++++++++-----------
be/src/io/fs/s3_file_writer.cpp | 32 ++++++++++++++------------------
be/src/util/s3_util.cpp | 18 +++++++++---------
be/src/util/s3_util.h | 19 ++++++++++---------
6 files changed, 64 insertions(+), 55 deletions(-)
diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index a9eeae7862f..5688b888f44 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -115,15 +115,16 @@ Status s3fs_error(const Aws::S3::S3Error& err,
std::string_view msg) {
using namespace Aws::Http;
switch (err.GetResponseCode()) {
case HttpResponseCode::NOT_FOUND:
- return Status::Error<NOT_FOUND, false>("{}: {} {}", msg,
err.GetExceptionName(),
- err.GetMessage());
+ return Status::Error<NOT_FOUND, false>("{}: {} {} type={}", msg,
err.GetExceptionName(),
+ err.GetMessage(),
err.GetErrorType());
case HttpResponseCode::FORBIDDEN:
- return Status::Error<PERMISSION_DENIED, false>("{}: {} {}", msg,
err.GetExceptionName(),
- err.GetMessage());
+ return Status::Error<PERMISSION_DENIED, false>("{}: {} {} type={}",
msg,
+ err.GetExceptionName(),
err.GetMessage(),
+ err.GetErrorType());
default:
- return Status::Error<doris::INTERNAL_ERROR, false>("{}: {} {}
code={}", msg,
-
err.GetExceptionName(), err.GetMessage(),
-
err.GetResponseCode());
+ return Status::Error<doris::INTERNAL_ERROR, false>(
+ "{}: {} {} code={} type={}", msg, err.GetExceptionName(),
err.GetMessage(),
+ err.GetResponseCode(), err.GetErrorType());
}
}
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 61569375d7e..c78eabd09d7 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -33,6 +33,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "io/fs/err_utils.h"
#include "io/fs/s3_common.h"
+#include "util/bvar_helper.h"
#include "util/doris_metrics.h"
#include "util/s3_util.h"
@@ -96,8 +97,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
if (!client) {
return Status::InternalError("init s3 client error");
}
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
auto outcome = client->GetObject(request);
- s3_bvar::s3_get_total << 1;
if (!outcome.IsSuccess()) {
return s3fs_error(outcome.GetError(),
fmt::format("failed to read from {}",
_path.native()));
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 77f303ffdc0..207bd9dff49 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -71,6 +71,7 @@
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_reader.h"
#include "io/fs/s3_file_writer.h"
+#include "util/bvar_helper.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
@@ -166,8 +167,8 @@ Status S3FileSystem::delete_file_impl(const Path& file) {
GET_KEY(key, file);
request.WithBucket(_s3_conf.bucket).WithKey(key);
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto outcome = client->DeleteObject(request);
- s3_bvar::s3_delete_total << 1;
if (outcome.IsSuccess() ||
outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
return Status::OK();
@@ -190,8 +191,11 @@ Status S3FileSystem::delete_directory_impl(const Path&
dir) {
delete_request.SetBucket(_s3_conf.bucket);
bool is_trucated = false;
do {
- auto outcome = client->ListObjectsV2(request);
- s3_bvar::s3_list_total << 1;
+ Aws::S3::Model::ListObjectsV2Outcome outcome;
+ {
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+ outcome = client->ListObjectsV2(request);
+ }
if (!outcome.IsSuccess()) {
return s3fs_error(
outcome.GetError(),
@@ -207,8 +211,8 @@ Status S3FileSystem::delete_directory_impl(const Path& dir)
{
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto delete_outcome = client->DeleteObjects(delete_request);
- s3_bvar::s3_delete_total << 1;
if (!delete_outcome.IsSuccess()) {
return s3fs_error(delete_outcome.GetError(),
fmt::format("failed to delete dir {}",
full_path(prefix)));
@@ -249,8 +253,8 @@ Status S3FileSystem::batch_delete_impl(const
std::vector<Path>& remote_files) {
}
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto delete_outcome = client->DeleteObjects(delete_request);
- s3_bvar::s3_delete_total << 1;
if (UNLIKELY(!delete_outcome.IsSuccess())) {
return s3fs_error(
delete_outcome.GetError(),
@@ -276,8 +280,8 @@ Status S3FileSystem::exists_impl(const Path& path, bool*
res) const {
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(_s3_conf.bucket).WithKey(key);
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
auto outcome = client->HeadObject(request);
- s3_bvar::s3_head_total << 1;
if (outcome.IsSuccess()) {
*res = true;
} else if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
@@ -297,8 +301,8 @@ Status S3FileSystem::file_size_impl(const Path& file,
int64_t* file_size) const
GET_KEY(key, file);
request.WithBucket(_s3_conf.bucket).WithKey(key);
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
auto outcome = client->HeadObject(request);
- s3_bvar::s3_head_total << 1;
if (!outcome.IsSuccess()) {
return s3fs_error(outcome.GetError(),
fmt::format("failed to get file size {}",
full_path(key)));
@@ -324,8 +328,11 @@ Status S3FileSystem::list_impl(const Path& dir, bool
only_file, std::vector<File
request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
bool is_trucated = false;
do {
- auto outcome = client->ListObjectsV2(request);
- s3_bvar::s3_list_total << 1;
+ Aws::S3::Model::ListObjectsV2Outcome outcome;
+ {
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
+ outcome = client->ListObjectsV2(request);
+ }
if (!outcome.IsSuccess()) {
return s3fs_error(outcome.GetError(),
fmt::format("failed to list {}",
full_path(prefix)));
@@ -425,8 +432,11 @@ Status S3FileSystem::download_impl(const Path&
remote_file, const Path& local_fi
GET_KEY(key, remote_file);
Aws::S3::Model::GetObjectRequest request;
request.WithBucket(_s3_conf.bucket).WithKey(key);
- Aws::S3::Model::GetObjectOutcome response = _client->GetObject(request);
- s3_bvar::s3_get_total << 1;
+ Aws::S3::Model::GetObjectOutcome response;
+ {
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
+ response = _client->GetObject(request);
+ }
if (response.IsSuccess()) {
Aws::OFStream local_file_s;
local_file_s.open(local_file, std::ios::out | std::ios::binary);
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 8ec6bddafd1..0ec3a46f808 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -52,19 +52,16 @@
#include "io/fs/path.h"
#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
+#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"
-namespace Aws {
-namespace S3 {
-namespace Model {
+namespace Aws::S3::Model {
class DeleteObjectRequest;
-} // namespace Model
-} // namespace S3
-} // namespace Aws
+} // namespace Aws::S3::Model
using Aws::S3::Model::AbortMultipartUploadRequest;
using Aws::S3::Model::CompletedPart;
@@ -74,8 +71,7 @@ using Aws::S3::Model::CreateMultipartUploadRequest;
using Aws::S3::Model::UploadPartRequest;
using Aws::S3::Model::UploadPartOutcome;
-namespace doris {
-namespace io {
+namespace doris::io {
using namespace Aws::S3::Model;
using Aws::S3::S3Client;
@@ -126,8 +122,8 @@ Status S3FileWriter::_create_multi_upload_request() {
_bucket, _path.native(), _upload_id);
});
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto outcome = _client->CreateMultipartUpload(create_request);
- s3_bvar::s3_multi_part_upload_total << 1;
if (outcome.IsSuccess()) {
_upload_id = outcome.GetResult().GetUploadId();
@@ -175,8 +171,8 @@ Status S3FileWriter::_abort() {
_wait_until_finish("Abort");
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto outcome = _client->AbortMultipartUpload(request);
- s3_bvar::s3_multi_part_upload_total << 1;
if (outcome.IsSuccess() ||
outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD
||
outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
@@ -324,10 +320,11 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
UploadFileBuffer& buf) {
upload_request.SetContentLength(buf.get_size());
upload_request.SetContentType("application/octet-stream");
- auto upload_part_callable = _client->UploadPartCallable(upload_request);
- s3_bvar::s3_multi_part_upload_total << 1;
-
- UploadPartOutcome upload_part_outcome = upload_part_callable.get();
+ UploadPartOutcome upload_part_outcome;
+ {
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
+ upload_part_outcome = _client->UploadPart(upload_request);
+ }
DBUG_EXECUTE_IF("s3_file_writer::_upload_one_part", {
if (part_num > 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
@@ -413,8 +410,8 @@ Status S3FileWriter::_complete() {
LOG_WARNING(s.to_string());
return s;
});
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
- s3_bvar::s3_multi_part_upload_total << 1;
if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(complete_outcome.GetError(),
@@ -464,8 +461,8 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
LOG(WARNING) << _st;
return;
});
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
auto response = _client->PutObject(request);
- s3_bvar::s3_put_total << 1;
if (!response.IsSuccess()) {
_st = s3fs_error(response.GetError(), fmt::format("failed to put
object {}, upload_id={}",
_path.native(),
_upload_id));
@@ -477,5 +474,4 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
s3_file_created_total << 1;
}
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 80bce1d4383..82a1b4097c6 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -42,15 +42,15 @@
namespace doris {
namespace s3_bvar {
-bvar::Adder<uint64_t> s3_get_total("s3_get", "total_num");
-bvar::Adder<uint64_t> s3_put_total("s3_put", "total_num");
-bvar::Adder<uint64_t> s3_delete_total("s3_delete", "total_num");
-bvar::Adder<uint64_t> s3_head_total("s3_head", "total_num");
-bvar::Adder<uint64_t> s3_multi_part_upload_total("s3_multi_part_upload",
"total_num");
-bvar::Adder<uint64_t> s3_list_total("s3_list", "total_num");
-bvar::Adder<uint64_t> s3_list_object_versions_total("s3_list_object_versions",
"total_num");
-bvar::Adder<uint64_t> s3_get_bucket_version_total("s3_get_bucket_version",
"total_num");
-bvar::Adder<uint64_t> s3_copy_object_total("s3_copy_object", "total_num");
+bvar::LatencyRecorder s3_get_latency("s3_get");
+bvar::LatencyRecorder s3_put_latency("s3_put");
+bvar::LatencyRecorder s3_delete_latency("s3_delete");
+bvar::LatencyRecorder s3_head_latency("s3_head");
+bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload");
+bvar::LatencyRecorder s3_list_latency("s3_list");
+bvar::LatencyRecorder
s3_list_object_versions_latency("s3_list_object_versions");
+bvar::LatencyRecorder s3_get_bucket_version_latency("s3_get_bucket_version");
+bvar::LatencyRecorder s3_copy_object_latency("s3_copy_object");
}; // namespace s3_bvar
class DorisAWSLogger final : public Aws::Utils::Logging::LogSystemInterface {
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 873f6b06f97..0727128c06b 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -19,6 +19,7 @@
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
+#include <bvar/bvar.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <stdint.h>
@@ -45,15 +46,15 @@ class Adder;
namespace doris {
namespace s3_bvar {
-extern bvar::Adder<uint64_t> s3_get_total;
-extern bvar::Adder<uint64_t> s3_put_total;
-extern bvar::Adder<uint64_t> s3_delete_total;
-extern bvar::Adder<uint64_t> s3_head_total;
-extern bvar::Adder<uint64_t> s3_multi_part_upload_total;
-extern bvar::Adder<uint64_t> s3_list_total;
-extern bvar::Adder<uint64_t> s3_list_object_versions_total;
-extern bvar::Adder<uint64_t> s3_get_bucket_version_total;
-extern bvar::Adder<uint64_t> s3_copy_object_total;
+extern bvar::LatencyRecorder s3_get_latency;
+extern bvar::LatencyRecorder s3_put_latency;
+extern bvar::LatencyRecorder s3_delete_latency;
+extern bvar::LatencyRecorder s3_head_latency;
+extern bvar::LatencyRecorder s3_multi_part_upload_latency;
+extern bvar::LatencyRecorder s3_list_latency;
+extern bvar::LatencyRecorder s3_list_object_versions_latency;
+extern bvar::LatencyRecorder s3_get_bucket_version_latency;
+extern bvar::LatencyRecorder s3_copy_object_latency;
}; // namespace s3_bvar
class S3URI;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]