This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0a8e3d21998 [enhance](PrefetchReader) Make the prefetch timeout one
config (#27371)
0a8e3d21998 is described below
commit 0a8e3d21998969908ce5795cd9b9b9e1b2d28343
Author: AlexYue <[email protected]>
AuthorDate: Fri Nov 24 10:24:15 2023 +0800
[enhance](PrefetchReader) Make the prefetch timeout one config (#27371)
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 2 ++
be/src/io/fs/buffered_reader.cpp | 37 +++++++++++++++++++------------------
3 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b8ad3523b3c..194d64aee95 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1119,6 +1119,8 @@ DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
+DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f080063265c..1f6634d8da5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1197,6 +1197,8 @@ DECLARE_Int32(ingest_binlog_work_pool_size);
// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);
+DECLARE_mInt32(buffered_reader_read_timeout_ms);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 1c016a6b22e..2ac5dd13964 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -386,20 +386,15 @@ Status MergeRangeFileReader::_fill_box(int range_index,
size_t start_offset, siz
return Status::OK();
}
-// the condition variable would wait at most 10 seconds
-// otherwise it would quit the procedure and treat it
-// as one time out error status and would make the load
-// task failed
-constexpr static int WAIT_TIME_OUT_MS = 10000;
-
// there exists occasions where the buffer is already closed but
// some prior tasks are still queued in thread pool, so we have to check
whether
// the buffer is closed each time the condition variable is notified.
void PrefetchBuffer::reset_offset(size_t offset) {
{
std::unique_lock lck {_lock};
- if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(WAIT_TIME_OUT_MS),
- [this]() { return _buffer_status !=
BufferStatus::PENDING; })) {
+ if (!_prefetched.wait_for(
+ lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
+ [this]() { return _buffer_status != BufferStatus::PENDING;
})) {
_prefetch_status = Status::TimedOut("time out when reset prefetch
buffer");
return;
}
@@ -426,10 +421,12 @@ void PrefetchBuffer::reset_offset(size_t offset) {
void PrefetchBuffer::prefetch_buffer() {
{
std::unique_lock lck {_lock};
- if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
- return _buffer_status == BufferStatus::RESET ||
- _buffer_status == BufferStatus::CLOSED;
- })) {
+ if (!_prefetched.wait_for(
+ lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
+ [this]() {
+ return _buffer_status == BufferStatus::RESET ||
+ _buffer_status == BufferStatus::CLOSED;
+ })) {
_prefetch_status = Status::TimedOut("time out when invoking
prefetch buffer");
return;
}
@@ -470,7 +467,8 @@ void PrefetchBuffer::prefetch_buffer() {
_statis.prefetch_request_io += 1;
_statis.prefetch_request_bytes += _len;
std::unique_lock lck {_lock};
- if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
+ if (!_prefetched.wait_for(lck,
+
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status ==
BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch
buffer");
return;
@@ -551,10 +549,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const
char* out, size_t buf_len,
{
std::unique_lock lck {_lock};
// buffer must be prefetched or it's closed
- if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
- return _buffer_status == BufferStatus::PREFETCHED ||
- _buffer_status == BufferStatus::CLOSED;
- })) {
+ if (!_prefetched.wait_for(
+ lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
+ [this]() {
+ return _buffer_status == BufferStatus::PREFETCHED ||
+ _buffer_status == BufferStatus::CLOSED;
+ })) {
_prefetch_status = Status::TimedOut("time out when read prefetch
buffer");
return _prefetch_status;
}
@@ -590,7 +590,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char*
out, size_t buf_len,
void PrefetchBuffer::close() {
std::unique_lock lck {_lock};
// in case _reader still tries to write to the buf after we close the
buffer
- if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
+ if (!_prefetched.wait_for(lck,
+
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status !=
BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when close prefetch
buffer");
return;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]