This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 63c5625b783 [Improvement]Add more load cpu usage to workload group
(#42053)
63c5625b783 is described below
commit 63c5625b783d3312fce59e997115b9de8367c808
Author: wangbo <[email protected]>
AuthorDate: Wed Oct 23 10:54:29 2024 +0800
[Improvement]Add more load cpu usage to workload group (#42053)
## Proposed changes
Add more workload cpu usage to workload group.
1 AsyncResultWriter's cpu usage.
2 Memtable flush's cpu usage when memtable is not on sink side.
---
be/src/olap/delta_writer.cpp | 7 ++++++-
be/src/olap/delta_writer_v2.cpp | 7 +++----
be/src/olap/memtable_flush_executor.cpp | 28 ++++++++++++--------------
be/src/olap/memtable_flush_executor.h | 13 ++++++------
be/src/olap/memtable_writer.cpp | 18 ++++-------------
be/src/olap/memtable_writer.h | 3 ++-
be/src/runtime/query_context.h | 2 +-
be/src/runtime/workload_group/workload_group.h | 11 ++++++++++
be/src/vec/sink/writer/async_result_writer.cpp | 11 ++++++++++
9 files changed, 58 insertions(+), 42 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 00c622df59f..e0e3a5281bc 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -103,10 +103,15 @@ Status BaseDeltaWriter::init() {
if (_is_init) {
return Status::OK();
}
+ auto* t_ctx = doris::thread_context(true);
+ std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
+ if (t_ctx) {
+ wg_sptr = t_ctx->workload_group().lock();
+ }
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_memtable_writer->init(
_rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
- _rowset_builder->get_partial_update_info(), nullptr,
+ _rowset_builder->get_partial_update_info(), wg_sptr,
_rowset_builder->tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 0e83de2ca17..a6fb0154489 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -127,13 +127,12 @@ Status DeltaWriterV2::init() {
_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
- ThreadPool* wg_thread_pool_ptr = nullptr;
+ std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (_state->get_query_ctx()) {
- wg_thread_pool_ptr =
_state->get_query_ctx()->get_memtable_flush_pool();
+ wg_sptr = _state->get_query_ctx()->workload_group();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
_partial_update_info,
- wg_thread_pool_ptr,
-
_streams[0]->enable_unique_mow(_req.index_id)));
+ wg_sptr,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
_streams.clear();
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index dc911647be8..5cdb45281b9 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -100,7 +100,16 @@ Status FlushToken::submit(std::shared_ptr<MemTable>
mem_table) {
int64_t submit_task_time = MonotonicNanos();
auto task = MemtableFlushTask::create_shared(
shared_from_this(), mem_table,
_rowset_writer->allocate_segment_id(), submit_task_time);
- Status ret = _thread_pool->submit(std::move(task));
+ // NOTE: we should guarantee WorkloadGroup is not deconstructed when
submit memtable flush task.
+ // because currently WorkloadGroup's can only be destroyed when all
queries in the group is finished,
+ // but not consider whether load channel is finish.
+ std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
+ ThreadPool* wg_thread_pool = nullptr;
+ if (wg_sptr) {
+ wg_thread_pool = wg_sptr->get_memtable_flush_pool_ptr();
+ }
+ Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
+ : _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no
need to notify _cond here
_stats.flush_running_count++;
@@ -236,7 +245,8 @@ void MemTableFlushExecutor::init(int num_disk) {
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>&
flush_token,
std::shared_ptr<RowsetWriter>
rowset_writer,
- bool is_high_priority) {
+ bool is_high_priority,
+
std::shared_ptr<WorkloadGroup> wg_sptr) {
switch (rowset_writer->type()) {
case ALPHA_ROWSET:
// alpha rowset do not support flush in CONCURRENT. and not support
alpha rowset now.
@@ -244,7 +254,7 @@ Status
MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
case BETA_ROWSET: {
// beta rowset can be flush in CONCURRENT, because each memtable using
a new segment writer.
ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() :
_flush_pool.get();
- flush_token = FlushToken::create_shared(pool);
+ flush_token = FlushToken::create_shared(pool, wg_sptr);
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
@@ -253,18 +263,6 @@ Status
MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
}
}
-Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>&
flush_token,
- std::shared_ptr<RowsetWriter>
rowset_writer,
- ThreadPool*
wg_flush_pool_ptr) {
- if (rowset_writer->type() == BETA_ROWSET) {
- flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
- } else {
- return Status::InternalError<false>("not support alpha rowset load
now.");
- }
- flush_token->set_rowset_writer(rowset_writer);
- return Status::OK();
-}
-
void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 25c5a37afba..27e8e8a9b0e 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -34,6 +34,7 @@ namespace doris {
class DataDir;
class MemTable;
class RowsetWriter;
+class WorkloadGroup;
// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
@@ -59,7 +60,8 @@ class FlushToken : public
std::enable_shared_from_this<FlushToken> {
ENABLE_FACTORY_CREATOR(FlushToken);
public:
- FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()),
_thread_pool(thread_pool) {}
+ FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
+ : _flush_status(Status::OK()), _thread_pool(thread_pool),
_wg_wptr(wg_sptr) {}
Status submit(std::shared_ptr<MemTable> mem_table);
@@ -108,6 +110,8 @@ private:
std::mutex _mutex;
std::condition_variable _cond;
+
+ std::weak_ptr<WorkloadGroup> _wg_wptr;
};
// MemTableFlushExecutor is responsible for flushing memtables to disk.
@@ -133,11 +137,8 @@ public:
void init(int num_disk);
Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
- std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority);
-
- Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
- std::shared_ptr<RowsetWriter> rowset_writer,
- ThreadPool* wg_flush_pool_ptr);
+ std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority,
+ std::shared_ptr<WorkloadGroup> wg_sptr);
private:
void _register_metrics();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index e8123c48ecc..88532646b66 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() {
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo>
partial_update_info,
- ThreadPool* wg_flush_pool_ptr, bool
unique_key_mow) {
+ std::shared_ptr<WorkloadGroup> wg_sptr, bool
unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
@@ -77,19 +77,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter>
rowset_writer,
// create flush handler
// by assigning segment_id to memtable before submiting to flush executor,
// we can make sure same keys sort in the same order in all replicas.
- if (wg_flush_pool_ptr) {
- RETURN_IF_ERROR(
- ExecEnv::GetInstance()
- ->storage_engine()
- .memtable_flush_executor()
- ->create_flush_token(_flush_token, _rowset_writer,
wg_flush_pool_ptr));
- } else {
- RETURN_IF_ERROR(
- ExecEnv::GetInstance()
- ->storage_engine()
- .memtable_flush_executor()
- ->create_flush_token(_flush_token, _rowset_writer,
_req.is_high_priority));
- }
+ RETURN_IF_ERROR(
+
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
+ _flush_token, _rowset_writer, _req.is_high_priority,
wg_sptr));
_is_init = true;
return Status::OK();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ec44348b4a9..fb07e740fa3 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -52,6 +52,7 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class RowsetWriter;
struct FlushStatistic;
+class WorkloadGroup;
namespace vectorized {
class Block;
@@ -67,7 +68,7 @@ public:
Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr
tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
- ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
+ std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow =
false);
Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 9d499f3487e..1a05b784d5b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -255,7 +255,7 @@ private:
// And will be shared by all instances of this query.
// So that we can control the max thread that a query can be used to
execute.
// If this token is not set, the scanner will be executed in
"_scan_thread_pool" in exec env.
- std::unique_ptr<ThreadPoolToken> _thread_token;
+ std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
void _init_query_mem_tracker();
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 933c5afdb4e..2ba84ce982b 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -198,6 +198,17 @@ public:
}
int64_t get_remote_scan_bytes_per_second();
+ CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() {
+ std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+ return _cgroup_cpu_ctl.get();
+ }
+
+ ThreadPool* get_memtable_flush_pool_ptr() {
+ // no lock here because this is called by memtable flush,
+ // to avoid lock competition with the workload thread pool's update
+ return _memtable_flush_pool.get();
+ }
+
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 16dcbc648fb..432ec1c54b5 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -107,6 +107,17 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
force_close(status);
}
+ if (state && state->get_query_ctx()) {
+ WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group();
+ if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) {
+ Status ret =
wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup();
+ if (ret.ok()) {
+ std::string wg_tname = "asyc_wr_" + wg_ptr->name();
+ Thread::set_self_name(wg_tname);
+ }
+ }
+ }
+
DCHECK(_dependency);
if (_writer_status.ok()) {
while (true) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]