This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b0c215e694 [enhance](be)add more profile in prefetched buffered reader
(#19119)
b0c215e694 is described below
commit b0c215e694811822898919c5cfe94e525c41bb14
Author: AlexYue <[email protected]>
AuthorDate: Tue May 2 09:53:39 2023 +0800
[enhance](be)add more profile in prefetched buffered reader (#19119)
---
be/src/io/fs/buffered_reader.cpp | 54 +++++++++++++++++++++++++++++-----
be/src/io/fs/buffered_reader.h | 24 +++++++++++----
be/src/service/internal_service.cpp | 5 +++-
be/test/io/fs/buffered_reader_test.cpp | 12 +++++---
4 files changed, 77 insertions(+), 18 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index dab5ab8f61..f57aa16f86 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -406,8 +406,13 @@ void PrefetchBuffer::prefetch_buffer() {
buf_size = merge_small_ranges(_offset, read_range_index);
}
- s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len,
_io_ctx);
+ {
+ SCOPED_RAW_TIMER(&_statis.read_time);
+ s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len,
_io_ctx);
+ }
g_bytes_downloaded << _len;
+ _statis.prefetch_request_io += 1;
+ _statis.prefetch_request_bytes += _len;
std::unique_lock lck {_lock};
_prefetched.wait(lck, [this]() { return _buffer_status ==
BufferStatus::PENDING; });
if (!s.ok() && _offset < _reader->size()) {
@@ -506,8 +511,13 @@ Status PrefetchBuffer::read_buffer(size_t off, const char*
out, size_t buf_len,
}
// [0]: maximum len trying to read, [1] maximum length buffer can provide,
[2] actual len buffer has
size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len
- off});
- memcpy((void*)out, _buf.get() + (off - _offset), read_len);
+ {
+ SCOPED_RAW_TIMER(&_statis.copy_time);
+ memcpy((void*)out, _buf.get() + (off - _offset), read_len);
+ }
*bytes_read = read_len;
+ _statis.request_io += 1;
+ _statis.request_bytes += read_len;
if (off + *bytes_read == _offset + _len) {
reset_offset(_offset + _whole_buffer_size);
}
@@ -520,11 +530,15 @@ void PrefetchBuffer::close() {
_prefetched.wait(lck, [this]() { return _buffer_status !=
BufferStatus::PENDING; });
_buffer_status = BufferStatus::CLOSED;
_prefetched.notify_all();
+ if (_sync_profile != nullptr) {
+ _sync_profile(*this);
+ }
}
// buffered reader
-PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader,
PrefetchRange file_range,
- const IOContext* io_ctx,
int64_t buffer_size)
+PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile,
io::FileReaderSPtr reader,
+ PrefetchRange file_range, const
IOContext* io_ctx,
+ int64_t buffer_size)
: _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx)
{
if (buffer_size == -1L) {
buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
@@ -533,12 +547,35 @@
PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, Prefet
_whole_pre_buffer_size = buffer_size;
_file_range.end_offset = std::min(_file_range.end_offset, _size);
int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size /
s_max_pre_buffer_size : 1;
+ std::function<void(PrefetchBuffer&)> sync_buffer = nullptr;
+ if (profile != nullptr) {
+ const char* prefetch_buffered_reader = "PrefetchBufferedReader";
+ ADD_TIMER(profile, prefetch_buffered_reader);
+ auto copy_time = ADD_CHILD_TIMER(profile, "CopyTime",
prefetch_buffered_reader);
+ auto read_time = ADD_CHILD_TIMER(profile, "ReadTime",
prefetch_buffered_reader);
+ auto prefetch_request_io =
+ ADD_CHILD_COUNTER(profile, "PreRequestIO", TUnit::UNIT,
prefetch_buffered_reader);
+ auto prefetch_request_bytes = ADD_CHILD_COUNTER(profile,
"PreRequestBytes", TUnit::BYTES,
+
prefetch_buffered_reader);
+ auto request_io =
+ ADD_CHILD_COUNTER(profile, "RequestIO", TUnit::UNIT,
prefetch_buffered_reader);
+ auto request_bytes =
+ ADD_CHILD_COUNTER(profile, "RequestBytes", TUnit::BYTES,
prefetch_buffered_reader);
+ sync_buffer = [=](PrefetchBuffer& buf) {
+ COUNTER_UPDATE(copy_time, buf._statis.copy_time);
+ COUNTER_UPDATE(read_time, buf._statis.read_time);
+ COUNTER_UPDATE(prefetch_request_io,
buf._statis.prefetch_request_io);
+ COUNTER_UPDATE(prefetch_request_bytes,
buf._statis.prefetch_request_bytes);
+ COUNTER_UPDATE(request_io, buf._statis.request_io);
+ COUNTER_UPDATE(request_bytes, buf._statis.request_bytes);
+ };
+ }
// set the _cur_offset of this reader as same as the inner reader's,
// to make sure the buffer reader will start to read at right position.
for (int i = 0; i < buffer_num; i++) {
- _pre_buffers.emplace_back(
- std::make_shared<PrefetchBuffer>(_file_range,
s_max_pre_buffer_size,
- _whole_pre_buffer_size,
_reader.get(), _io_ctx));
+ _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
+ _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size,
_reader.get(), _io_ctx,
+ sync_buffer));
}
}
@@ -690,7 +727,8 @@ Status DelegateReader::create_file_reader(RuntimeProfile*
profile,
*file_reader = std::make_shared<InMemoryFileReader>(reader);
} else if (access_mode == AccessMode::SEQUENTIAL) {
io::FileReaderSPtr safeReader =
std::make_shared<ThreadSafeReader>(reader);
- *file_reader =
std::make_shared<io::PrefetchBufferedReader>(safeReader, file_range, io_ctx);
+ *file_reader = std::make_shared<io::PrefetchBufferedReader>(profile,
safeReader, file_range,
+ io_ctx);
} else {
*file_reader = std::move(reader);
}
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 0208139ba1..f22789de8f 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -314,13 +314,15 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer> {
enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t
whole_buffer_size,
- io::FileReader* reader, const IOContext* io_ctx)
+ io::FileReader* reader, const IOContext* io_ctx,
+ std::function<void(PrefetchBuffer&)> sync_profile)
: _file_range(file_range),
_size(buffer_size),
_whole_buffer_size(whole_buffer_size),
_reader(reader),
_io_ctx(io_ctx),
- _buf(new char[buffer_size]) {}
+ _buf(new char[buffer_size]),
+ _sync_profile(sync_profile) {}
PrefetchBuffer(PrefetchBuffer&& other)
: _offset(other._offset),
@@ -330,7 +332,8 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer> {
_whole_buffer_size(other._whole_buffer_size),
_reader(other._reader),
_io_ctx(other._io_ctx),
- _buf(std::move(other._buf)) {}
+ _buf(std::move(other._buf)),
+ _sync_profile(std::move(other._sync_profile)) {}
~PrefetchBuffer() = default;
@@ -351,6 +354,16 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer> {
std::condition_variable _prefetched;
Status _prefetch_status {Status::OK()};
std::atomic_bool _exceed = false;
+ std::function<void(PrefetchBuffer&)> _sync_profile;
+ struct Statistics {
+ int64_t copy_time {0};
+ int64_t read_time {0};
+ int64_t prefetch_request_io {0};
+ int64_t prefetch_request_bytes {0};
+ int64_t request_io {0};
+ int64_t request_bytes {0};
+ };
+ Statistics _statis;
// @brief: reset the start offset of this buffer to offset
// @param: the new start offset for this buffer
@@ -396,8 +409,9 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer> {
*/
class PrefetchBufferedReader : public io::FileReader {
public:
- PrefetchBufferedReader(io::FileReaderSPtr reader, PrefetchRange file_range,
- const IOContext* io_ctx = nullptr, int64_t
buffer_size = -1L);
+ PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+ PrefetchRange file_range, const IOContext* io_ctx =
nullptr,
+ int64_t buffer_size = -1L);
~PrefetchBufferedReader() override;
Status close() override;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 0dbc26d6ac..ab639e6f29 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -512,8 +512,11 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
const TFileRangeDesc& range = file_scan_range.ranges.at(0);
const TFileScanRangeParams& params = file_scan_range.params;
+ // make sure profile is desctructed after reader cause
PrefetchBufferedReader
+ // might asynchronouslly access the profile
+ std::unique_ptr<RuntimeProfile> profile =
+ std::make_unique<RuntimeProfile>("FetchTableSchema");
std::unique_ptr<vectorized::GenericReader> reader(nullptr);
- std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
io::IOContext io_ctx;
io::FileCacheStatistics file_cache_statis;
io_ctx.file_cache_stats = &file_cache_statis;
diff --git a/be/test/io/fs/buffered_reader_test.cpp
b/be/test/io/fs/buffered_reader_test.cpp
index 35b0cc60e1..6a281e125f 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -124,7 +124,8 @@ TEST_F(BufferedReaderTest, normal_use) {
io::global_local_filesystem()->open_file(
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file",
&local_reader);
auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
- io::PrefetchBufferedReader reader(std::move(sync_local_reader),
io::PrefetchRange(0, 1024));
+ io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+ io::PrefetchRange(0, 1024));
uint8_t buf[1024];
Slice result {buf, 1024};
MonotonicStopWatch watch;
@@ -143,7 +144,8 @@ TEST_F(BufferedReaderTest, test_validity) {
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
&local_reader);
auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
- io::PrefetchBufferedReader reader(std::move(sync_local_reader),
io::PrefetchRange(0, 1024));
+ io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+ io::PrefetchRange(0, 1024));
Status st;
uint8_t buf[10];
Slice result {buf, 10};
@@ -192,7 +194,8 @@ TEST_F(BufferedReaderTest, test_seek) {
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
&local_reader);
auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
- io::PrefetchBufferedReader reader(std::move(sync_local_reader),
io::PrefetchRange(0, 1024));
+ io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+ io::PrefetchRange(0, 1024));
Status st;
uint8_t buf[10];
@@ -238,7 +241,8 @@ TEST_F(BufferedReaderTest, test_miss) {
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
&local_reader);
auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
- io::PrefetchBufferedReader reader(std::move(sync_local_reader),
io::PrefetchRange(0, 1024));
+ io::PrefetchBufferedReader reader(nullptr, std::move(sync_local_reader),
+ io::PrefetchRange(0, 1024));
uint8_t buf[128];
Slice result {buf, 128};
size_t bytes_read;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]