This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 62a64d10ae8 [enhancement](mem-tracker) Use thread local mem tracker to
track s3 file buffer memory usage (#40597)
62a64d10ae8 is described below
commit 62a64d10ae8d1d6431834f09ad296836b8cdedc3
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Sep 26 12:32:12 2024 +0800
[enhancement](mem-tracker) Use thread local mem tracker to track s3 file
buffer memory usage (#40597)
Track s3 file buffer memory usage with thread local tracker, so that
memory usage will be specified to detail.
---
be/src/io/fs/s3_file_bufferpool.cpp | 25 +++++++++++++++++++------
be/src/io/fs/s3_file_bufferpool.h | 14 +++++++++-----
be/src/olap/tablet.cpp | 6 ++++++
be/src/olap/tablet.h | 3 +++
be/src/runtime/snapshot_loader.cpp | 5 +++++
be/src/runtime/snapshot_loader.h | 2 ++
6 files changed, 44 insertions(+), 11 deletions(-)
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp
b/be/src/io/fs/s3_file_bufferpool.cpp
index f1f90ea7f2e..0d59ea0ed88 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -31,6 +31,7 @@
#include "io/cache/file_cache_common.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/slice.h"
@@ -77,17 +78,19 @@ Slice FileBuffer::get_slice() const {
}
FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()>
alloc_holder,
- size_t offset, OperationState state)
+ size_t offset, OperationState state,
+ std::shared_ptr<MemTrackerLimiter> mem_tracker)
: _type(type),
_alloc_holder(std::move(alloc_holder)),
_offset(offset),
_size(0),
_state(std::move(state)),
_inner_data(std::make_unique<FileBuffer::PartData>()),
- _capacity(_inner_data->size()) {}
+ _capacity(_inner_data->size()),
+ _mem_tracker(std::move(mem_tracker)) {}
FileBuffer::~FileBuffer() {
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_inner_data.reset();
}
@@ -240,13 +243,22 @@ FileBufferBuilder&
FileBufferBuilder::set_allocate_file_blocks_holder(
}
Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker();
+ auto* thread_ctx = doris::thread_context(true);
+ if (thread_ctx != nullptr) {
+ // if thread local mem tracker is set, use it instead.
+ auto curr_tracker =
thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker();
+ if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) {
+ mem_tracker = std::move(curr_tracker);
+ }
+ }
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
OperationState state(_sync_after_complete_task, _is_cancelled);
if (_type == BufferType::UPLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
std::move(_upload_cb),
std::move(state), _offset,
- std::move(_alloc_holder_cb)));
+ std::move(_alloc_holder_cb),
std::move(mem_tracker)));
return Status::OK();
}
if (_type == BufferType::DOWNLOAD) {
@@ -254,7 +266,8 @@ Status
FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
std::move(_download),
std::move(_write_to_local_file_cache),
std::move(_write_to_use_buffer),
std::move(state),
- _offset,
std::move(_alloc_holder_cb)));
+ _offset, std::move(_alloc_holder_cb),
+ std::move(mem_tracker)));
return Status::OK();
}
// should never come here
diff --git a/be/src/io/fs/s3_file_bufferpool.h
b/be/src/io/fs/s3_file_bufferpool.h
index 1b552850ae3..a603c3cb29a 100644
--- a/be/src/io/fs/s3_file_bufferpool.h
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "io/cache/file_block.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "util/crc32c.h"
#include "util/slice.h"
#include "util/threadpool.h"
@@ -77,7 +78,7 @@ struct OperationState {
struct FileBuffer {
FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()>
alloc_holder, size_t offset,
- OperationState state);
+ OperationState state, std::shared_ptr<MemTrackerLimiter>
mem_tracker);
virtual ~FileBuffer();
/**
* submit the correspoding task to async executor
@@ -127,14 +128,16 @@ struct FileBuffer {
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _capacity;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};
struct DownloadFileBuffer final : public FileBuffer {
DownloadFileBuffer(std::function<Status(Slice&)> download,
std::function<void(FileBlocksHolderPtr, Slice)>
write_to_cache,
std::function<void(Slice, size_t)> write_to_use_buffer,
OperationState state,
- size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder)
- : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
+ size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder,
+ std::shared_ptr<MemTrackerLimiter> mem_tracker)
+ : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state,
std::move(mem_tracker)),
_download(std::move(download)),
_write_to_local_file_cache(std::move(write_to_cache)),
_write_to_use_buffer(std::move(write_to_use_buffer)) {}
@@ -153,8 +156,9 @@ struct DownloadFileBuffer final : public FileBuffer {
struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb,
OperationState state,
- size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder)
- : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
+ size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder,
+ std::shared_ptr<MemTrackerLimiter> mem_tracker)
+ : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state,
std::move(mem_tracker)),
_upload_to_remote(std::move(upload_cb)) {}
~UploadFileBuffer() override = default;
Status append_data(const Slice& s) override;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 51eabe5495e..ad2671962d8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -106,6 +106,7 @@
#include "olap/txn_manager.h"
#include "olap/types.h"
#include "olap/utils.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "tablet.h"
@@ -268,6 +269,9 @@ Tablet::Tablet(StorageEngine& engine, TabletMetaSharedPtr
tablet_meta, DataDir*
_tablet_path = fmt::format("{}/{}/{}/{}/{}", _data_dir->path(),
DATA_PREFIX,
_tablet_meta->shard_id(), tablet_id(),
schema_hash());
}
+ _upload_cooldown_meta_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
+ fmt::format("UploadCoolDownMeta#tablet_id={}", tablet_id()));
}
bool Tablet::set_tablet_schema_into_rowset_meta() {
@@ -2100,6 +2104,8 @@ Status Tablet::write_cooldown_meta() {
_cooldown_conf.cooldown_replica_id,
tablet_id());
}
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_upload_cooldown_meta_tracker);
+
auto storage_resource =
DORIS_TRY(get_resource_by_storage_policy_id(storage_policy_id()));
std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 33253e82ced..71af08e61cd 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -46,6 +46,7 @@
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/version_graph.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "segment_loader.h"
#include "util/metrics.h"
#include "util/once.h"
@@ -608,6 +609,8 @@ private:
std::shared_ptr<const VersionWithTime> _visible_version;
std::atomic_bool _is_full_compaction_running = false;
+
+ std::shared_ptr<MemTrackerLimiter> _upload_cooldown_meta_tracker;
};
inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index d04a5463879..0a13ac6085c 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -52,6 +52,7 @@
#include "olap/tablet_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
#include "util/thrift_rpc_helper.h"
@@ -115,6 +116,9 @@ Status SnapshotLoader::init(TStorageBackendType::type type,
const std::string& l
} else {
return Status::InternalError("Unknown storage type: {}", type);
}
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
+ fmt::format("SnapShotLoader#job_id={}#task_id={}", _job_id,
_task_id));
return Status::OK();
}
@@ -125,6 +129,7 @@ Status SnapshotLoader::upload(const std::map<std::string,
std::string>& src_to_d
if (!_remote_fs) {
return Status::InternalError("Storage backend not initialized.");
}
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
LOG(INFO) << "begin to upload snapshot files. num: " <<
src_to_dest_path.size()
<< ", broker addr: " << _broker_addr << ", job: " << _job_id <<
", task" << _task_id;
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 7b1d5a0d942..b6da1a2adae 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -26,6 +26,7 @@
#include "common/status.h"
#include "olap/tablet_fwd.h"
+#include "runtime/memory/mem_tracker_limiter.h"
namespace doris {
namespace io {
@@ -111,6 +112,7 @@ private:
const TNetworkAddress _broker_addr;
const std::map<std::string, std::string> _prop;
std::shared_ptr<io::RemoteFileSystem> _remote_fs;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};
} // end namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]