This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 493dc26d743 [opt](s3) auto retry when meeting 429 error (#35396)
493dc26d743 is described below
commit 493dc26d7435d9e78fe87a873b5174df9e64a886
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Jun 27 16:01:23 2024 +0800
[opt](s3) auto retry when meeting 429 error (#35396)
Sometime the s3 sdk will return error like:
```
QpsLimitExceeded Unable to parse ExceptionName: QpsLimitExceeded Message:
Please reduce your request rate. code=429 type=100, request_id=66516C288EC49
```
We should slowdown the request rate by sleeping for a while.
- Add 2 new BE config
- `s3_read_base_wait_time_ms` and `s3_read_max_wait_time_ms`
When meet s3 429 error, the "get" request will
sleep `s3_read_base_wait_time_ms (*1, *2, *3, *4)` ms get
try again.
The max sleep time is s3_read_max_wait_time_ms
and the max retry time is max_s3_client_retry
- Add more metrics for s3 file reader
- `s3_file_reader_too_many_request`: counter of 429 error.
- `s3_file_reader_s3_get_request`: the QPS of s3 get request.
- `TotalGetRequest`: Get request counter in profile
- `TooManyRequestErr`: 429 error counter in profile
- `TooManyRequestSleepTime`: Sum of sleep time after 429 error in
profile
- `TotalBytesRead`: Total bytes read from s3 in profile

---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 7 +++
be/src/io/file_factory.cpp | 2 +-
be/src/io/fs/s3_file_reader.cpp | 109 ++++++++++++++++++++++++++++++++--------
be/src/io/fs/s3_file_reader.h | 24 +++++++--
be/src/io/fs/s3_file_system.cpp | 2 +-
6 files changed, 120 insertions(+), 26 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 4460e477c8f..a08bb43db56 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1222,6 +1222,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
DEFINE_mInt32(max_s3_client_retry, "10");
+DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
+DEFINE_mInt32(s3_read_max_wait_time_ms, "800");
DEFINE_mBool(enable_s3_rate_limiter, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index dbf18002704..6942e316dcd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1304,6 +1304,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);
DECLARE_mBool(enable_s3_rate_limiter);
// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
+// When meet s3 429 error, the "get" request will
+// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms
+// get try again.
+// The max sleep time is s3_read_max_wait_time_ms
+// and the max retry time is max_s3_client_retry
+DECLARE_mInt32(s3_read_base_wait_time_ms);
+DECLARE_mInt32(s3_read_max_wait_time_ms);
// write as inverted index tmp directory
DECLARE_String(tmp_file_dir);
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index e61ed144486..0c84c2eb74c 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -156,7 +156,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
auto client_holder =
std::make_shared<io::ObjClientHolder>(s3_conf.client_conf);
RETURN_IF_ERROR_RESULT(client_holder->init());
return io::S3FileReader::create(std::move(client_holder),
s3_conf.bucket, s3_uri.get_key(),
- file_description.file_size)
+ file_description.file_size, profile)
.and_then([&](auto&& reader) {
return io::create_cached_file_reader(std::move(reader),
reader_options);
});
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index e7775803198..a5c6ec09162 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -37,6 +37,7 @@
#include "io/fs/s3_common.h"
#include "util/bvar_helper.h"
#include "util/doris_metrics.h"
+#include "util/runtime_profile.h"
#include "util/s3_util.h"
namespace doris::io {
@@ -45,13 +46,18 @@ bvar::Adder<uint64_t>
s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read");
+bvar::Adder<uint64_t>
s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request");
bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read");
// also QPS
bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader",
"s3_read_throughput",
&s3_bytes_read_total);
+// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only
+// record successfull request, and s3_get_request_qps will record all request.
+bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader",
"s3_get_request",
+
&s3_file_reader_read_counter);
Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const
ObjClientHolder> client,
- std::string bucket, std::string
key,
- int64_t file_size) {
+ std::string bucket, std::string
key, int64_t file_size,
+ RuntimeProfile* profile) {
if (file_size < 0) {
auto res = client->object_file_size(bucket, key);
if (!res.has_value()) {
@@ -62,16 +68,17 @@ Result<FileReaderSPtr>
S3FileReader::create(std::shared_ptr<const ObjClientHolde
}
return std::make_shared<S3FileReader>(std::move(client),
std::move(bucket), std::move(key),
- file_size);
+ file_size, profile);
}
S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client,
std::string bucket,
- std::string key, size_t file_size)
+ std::string key, size_t file_size, RuntimeProfile*
profile)
: _path(fmt::format("s3://{}/{}", bucket, key)),
_file_size(file_size),
_bucket(std::move(bucket)),
_key(std::move(key)),
- _client(std::move(client)) {
+ _client(std::move(client)),
+ _profile(profile) {
DorisMetrics::instance()->s3_file_open_reading->increment(1);
DorisMetrics::instance()->s3_file_reader_total->increment(1);
s3_file_reader_total << 1;
@@ -113,23 +120,85 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
if (!client) {
return Status::InternalError("init s3 client error");
}
- // clang-format off
- auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
- to, offset, bytes_req, bytes_read);
- // clang-format on
- if (resp.status.code != ErrorCode::OK) {
- return std::move(Status(resp.status.code, std::move(resp.status.msg))
- .append(fmt::format("failed to read from {}",
_path.native())));
+ // // clang-format off
+ // auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
+ // to, offset, bytes_req, bytes_read);
+ // // clang-format on
+ // if (resp.status.code != ErrorCode::OK) {
+ // return std::move(Status(resp.status.code,
std::move(resp.status.msg))
+ // .append(fmt::format("failed to read from
{}", _path.native())));
+ // }
+ // if (*bytes_read != bytes_req) {
+ // return Status::InternalError("failed to read from {}(bytes read:
{}, bytes req: {})",
+ // _path.native(), *bytes_read,
bytes_req);
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
+
+ int retry_count = 0;
+ const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait
time in milliseconds
+ const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum
wait time in milliseconds
+ const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s,
8s for each backoff
+
+ int total_sleep_time = 0;
+ while (retry_count <= max_retries) {
+ s3_file_reader_read_counter << 1;
+ // clang-format off
+ auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
+ to, offset, bytes_req, bytes_read);
+ // clang-format on
+ _s3_stats.total_get_request_counter++;
+ if (resp.status.code != ErrorCode::OK) {
+ if (resp.http_code ==
+
static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) {
+ s3_file_reader_too_many_request_counter << 1;
+ retry_count++;
+ int wait_time = std::min(base_wait_time * (1 << retry_count),
+ max_wait_time); // Exponential backoff
+
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
+ _s3_stats.too_many_request_err_counter++;
+ _s3_stats.too_many_request_sleep_time_ms += wait_time;
+ total_sleep_time += wait_time;
+ continue;
+ } else {
+ // Handle other errors
+ return std::move(Status(resp.status.code,
std::move(resp.status.msg))
+ .append("failed to read"));
+ }
+ }
+ if (*bytes_read != bytes_req) {
+ return Status::InternalError("failed to read (bytes read: {},
bytes req: {})",
+ *bytes_read, bytes_req);
+ }
+ _s3_stats.total_bytes_read += bytes_req;
+ s3_bytes_read_total << bytes_req;
+ s3_bytes_per_read << bytes_req;
+ DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req);
+ if (retry_count > 0) {
+ LOG(INFO) << fmt::format("read s3 file {} succeed after {} times
with {} ms sleeping",
+ _path.native(), retry_count,
total_sleep_time);
+ }
+ return Status::OK();
}
- if (*bytes_read != bytes_req) {
- return Status::InternalError("failed to read from {}(bytes read: {},
bytes req: {})",
- _path.native(), *bytes_read, bytes_req);
+ return Status::InternalError("failed to read from s3, exceeded maximum
retries");
+}
+
+void S3FileReader::_collect_profile_before_close() {
+ if (_profile != nullptr) {
+ const char* s3_profile_name = "S3Profile";
+ ADD_TIMER(_profile, s3_profile_name);
+ RuntimeProfile::Counter* total_get_request_counter =
+ ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT,
s3_profile_name);
+ RuntimeProfile::Counter* too_many_request_err_counter =
+ ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT,
s3_profile_name);
+ RuntimeProfile::Counter* too_many_request_sleep_time =
ADD_CHILD_COUNTER(
+ _profile, "TooManyRequestSleepTime", TUnit::TIME_MS,
s3_profile_name);
+ RuntimeProfile::Counter* total_bytes_read =
+ ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES,
s3_profile_name);
+
+ COUNTER_UPDATE(total_get_request_counter,
_s3_stats.total_get_request_counter);
+ COUNTER_UPDATE(too_many_request_err_counter,
_s3_stats.too_many_request_err_counter);
+ COUNTER_UPDATE(too_many_request_sleep_time,
_s3_stats.too_many_request_sleep_time_ms);
+ COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read);
}
- s3_bytes_read_total << *bytes_read;
- s3_bytes_per_read << *bytes_read;
- s3_file_reader_read_counter << 1;
- DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
- return Status::OK();
}
} // namespace doris::io
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index d681161ebed..36fe67b342c 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -28,16 +28,20 @@
#include "io/fs/s3_file_system.h"
#include "util/slice.h"
-namespace doris::io {
+namespace doris {
+class RuntimeProfile;
+
+namespace io {
struct IOContext;
class S3FileReader final : public FileReader {
public:
static Result<FileReaderSPtr> create(std::shared_ptr<const
ObjClientHolder> client,
- std::string bucket, std::string key,
int64_t file_size);
+ std::string bucket, std::string key,
int64_t file_size,
+ RuntimeProfile* profile);
S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string
bucket, std::string key,
- size_t file_size);
+ size_t file_size, RuntimeProfile* profile);
~S3FileReader() override;
@@ -53,7 +57,15 @@ protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
+ void _collect_profile_before_close() override;
+
private:
+ struct S3Statistics {
+ int64_t total_get_request_counter = 0;
+ int64_t too_many_request_err_counter = 0;
+ int64_t too_many_request_sleep_time_ms = 0;
+ int64_t total_bytes_read = 0;
+ };
Path _path;
size_t _file_size;
@@ -62,6 +74,10 @@ private:
std::shared_ptr<const ObjClientHolder> _client;
std::atomic<bool> _closed = false;
+
+ RuntimeProfile* _profile = nullptr;
+ S3Statistics _s3_stats;
};
-} // namespace doris::io
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 2cc82d2da0f..93f36429485 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -196,7 +196,7 @@ Status S3FileSystem::create_file_impl(const Path& file,
FileWriterPtr* writer,
Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr*
reader,
const FileReaderOptions& opts) {
auto key = DORIS_TRY(get_key(file));
- *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key,
opts.file_size));
+ *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key,
opts.file_size, nullptr));
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]