This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 4b445d9f22e [enhance](S3) Add timeout config for s3 buffer allocation
#26125 (#27987)
4b445d9f22e is described below
commit 4b445d9f22e40218a5f1795e823d2648c6193c45
Author: AlexYue <[email protected]>
AuthorDate: Fri Dec 22 14:17:18 2023 +0800
[enhance](S3) Add timeout config for s3 buffer allocation #26125 (#27987)
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 3 +++
be/src/io/fs/s3_file_write_bufferpool.cpp | 18 ++++++++++++++----
be/src/io/fs/s3_file_write_bufferpool.h | 2 +-
be/src/io/fs/s3_file_writer.cpp | 3 ++-
5 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 071422b57da..6acb373928b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1108,6 +1108,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");
DEFINE_Bool(enable_snapshot_action, "false");
+DEFINE_mInt32(s3_writer_buffer_allocation_timeout_second, "60");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b962a79075e..24a7340063d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1165,6 +1165,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);
+// The timeout config for S3 write buffer allocation
+DECLARE_mInt32(s3_writer_buffer_allocation_timeout_second);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp
b/be/src/io/fs/s3_file_write_bufferpool.cpp
index 48887f9c6ea..5c5aa662316 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_write_bufferpool.cpp
@@ -17,6 +17,7 @@
#include "s3_file_write_bufferpool.h"
+#include <chrono>
#include <cstring>
#include "common/config.h"
@@ -40,7 +41,7 @@ void S3FileBuffer::on_finished() {
// when there is memory preserved, directly write data to buf
// TODO:(AlexYue): write to file cache otherwise, then we'll wait for free
buffer
// and to rob it
-void S3FileBuffer::append_data(const Slice& data) {
+Status S3FileBuffer::append_data(const Slice& data) {
Defer defer {[&] { _size += data.get_size(); }};
while (true) {
// if buf is not empty, it means there is memory preserved for this buf
@@ -50,9 +51,14 @@ void S3FileBuffer::append_data(const Slice& data) {
} else {
// wait allocate buffer pool
auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+ if (tmp->get_size() == 0) {
+ return Status::InternalError("Failed to allocate s3 writer
buffer for {} seconds",
+
config::s3_writer_buffer_allocation_timeout_second);
+ }
rob_buffer(tmp);
}
}
+ return Status::OK();
}
void S3FileBuffer::submit() {
@@ -81,13 +87,17 @@ void S3FileBufferPool::init(int32_t
s3_write_buffer_whole_size, int32_t s3_write
std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
std::shared_ptr<S3FileBuffer> buf =
std::make_shared<S3FileBuffer>(_thread_pool);
+ int64_t timeout = config::s3_writer_buffer_allocation_timeout_second;
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
std::unique_lock<std::mutex> lck {_lock};
- _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
- buf->reserve_buffer(_free_raw_buffers.front());
- _free_raw_buffers.pop_front();
+ _cv.wait_for(lck, std::chrono::seconds(timeout),
+ [this]() { return !_free_raw_buffers.empty(); });
+ if (!_free_raw_buffers.empty()) {
+ buf->reserve_buffer(_free_raw_buffers.front());
+ _free_raw_buffers.pop_front();
+ }
}
return buf;
}
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h
b/be/src/io/fs/s3_file_write_bufferpool.h
index ad5f698f983..b4d3f322904 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -52,7 +52,7 @@ struct S3FileBuffer : public
std::enable_shared_from_this<S3FileBuffer> {
// append data into the memory buffer inside
// or into the file cache if the buffer has no memory buffer
- void append_data(const Slice& data);
+ Status append_data(const Slice& data);
// upload to S3 and file cache in async threadpool
void submit();
// set the callback to upload to S3 file
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 4a937f52057..18de6ed0389 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -240,7 +240,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
// if the buffer has memory buf inside, the data would be written
into memory first then S3 then file cache
// it would be written to cache then S3 if the buffer doesn't have
memory preserved
- _pending_buf->append_data(Slice {data[i].get_data() + pos,
data_size_to_append});
+ RETURN_IF_ERROR(_pending_buf->append_data(
+ Slice {data[i].get_data() + pos, data_size_to_append}));
// if it's the last part, it could be less than 5MB, or it must
// satisfy that the size is larger than or euqal to 5MB
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]