This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2b965303d8f [branch-3.0] pick some bugfix (#42128)
2b965303d8f is described below
commit 2b965303d8f92ffa66609264c300da1c71879242
Author: yiguolei <[email protected]>
AuthorDate: Sat Oct 19 18:19:28 2024 +0800
[branch-3.0] pick some bugfix (#42128)
---
be/src/common/config.cpp | 2 -
be/src/common/config.h | 2 -
be/src/common/daemon.cpp | 1 +
be/src/olap/memtable.cpp | 40 +++++------
be/src/olap/memtable.h | 37 +++++-----
be/src/olap/memtable_flush_executor.cpp | 24 +++----
be/src/olap/memtable_flush_executor.h | 4 +-
be/src/olap/memtable_memory_limiter.cpp | 4 +-
be/src/olap/memtable_writer.cpp | 40 ++++-------
be/src/olap/memtable_writer.h | 11 ++-
be/src/olap/rowset/segment_v2/segment.cpp | 57 ++++++++-------
be/src/olap/rowset/segment_v2/segment.h | 10 ++-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 6 +-
be/src/olap/segment_loader.cpp | 8 ++-
be/src/olap/segment_loader.h | 12 ++++
be/src/runtime/load_channel.cpp | 7 --
be/src/runtime/load_channel.h | 1 -
be/src/runtime/memory/mem_tracker_limiter.h | 3 -
be/src/runtime/query_context.cpp | 2 -
be/src/runtime/tablets_channel.cpp | 2 +-
be/src/runtime/workload_group/workload_group.cpp | 84 +++++++++++++---------
be/src/runtime/workload_group/workload_group.h | 15 +---
.../workload_group/workload_group_manager.cpp | 7 ++
.../workload_group/workload_group_manager.h | 2 +
be/src/service/internal_service.cpp | 16 ++---
25 files changed, 199 insertions(+), 198 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6e459b3638b..4aea6200ce0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -626,8 +626,6 @@ DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
// percent of (active memtables size / all memtables size) when reach soft
limit
DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
-// memtable insert memory tracker will multiply input block size with this
ratio
-DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
// max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index dc1f60b69f8..13437d96b8e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -674,8 +674,6 @@ DECLARE_mInt32(memtable_hard_limit_active_percent);
// percent of (active memtables size / all memtables size) when reach soft
limit
DECLARE_mInt32(memtable_soft_limit_active_percent);
-// memtable insert memory tracker will multiply input block size with this
ratio
-DECLARE_mDouble(memtable_insert_memory_ratio);
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5da49758865..27fbfb71d7f 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() {
// TODO replace memory_gc_thread.
// step 6. Refresh weighted memory ratio of workload groups.
+ doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
// step 7. Analyze blocking queries.
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 0040e00ffc9..a70486e39b3 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -50,20 +50,16 @@ using namespace ErrorCode;
MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema>
tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
- bool enable_unique_key_mow, PartialUpdateInfo*
partial_update_info,
- const std::shared_ptr<MemTracker>& insert_mem_tracker,
- const std::shared_ptr<MemTracker>& flush_mem_tracker)
- : _tablet_id(tablet_id),
+ bool enable_unique_key_mow, PartialUpdateInfo*
partial_update_info)
+ : _mem_type(MemType::ACTIVE),
+ _tablet_id(tablet_id),
_enable_unique_key_mow(enable_unique_key_mow),
_keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
- _insert_mem_tracker(insert_mem_tracker),
- _flush_mem_tracker(flush_mem_tracker),
_is_first_insertion(true),
_agg_functions(tablet_schema->num_columns()),
_offsets_of_aggregate_states(tablet_schema->num_columns()),
- _total_size_of_aggregate_states(0),
- _mem_usage(0) {
+ _total_size_of_aggregate_states(0) {
g_memtable_cnt << 1;
_query_thread_context.init_unlocked();
_arena = std::make_unique<vectorized::Arena>();
@@ -82,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id,
std::shared_ptr<TabletSchema> tablet_schem
}
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
+ _mem_tracker = std::make_shared<MemTracker>();
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
@@ -142,6 +139,13 @@ void MemTable::_init_agg_functions(const
vectorized::Block* block) {
MemTable::~MemTable() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
+ if (_is_flush_success) {
+ // If the memtable is flush success, then its memtracker's consumption
should be 0
+ if (_mem_tracker->consumption() != 0 &&
config::crash_in_memory_tracker_inaccurate) {
+ LOG(FATAL) << "memtable flush success but cosumption is not 0, it
is "
+ << _mem_tracker->consumption();
+ }
+ }
g_memtable_input_block_allocated_size <<
-_input_mutable_block.allocated_bytes();
g_memtable_cnt << -1;
if (_keys_type != KeysType::DUP_KEYS) {
@@ -159,13 +163,7 @@ MemTable::~MemTable() {
}
}
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
std::default_delete<RowInBlock>());
- _insert_mem_tracker->release(_mem_usage);
- _flush_mem_tracker->set_consumption(0);
- DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl
- <<
_insert_mem_tracker->log_usage();
- DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
_arena.reset();
- _agg_buffer_pool.clear();
_vec_row_comparator.reset();
_row_in_blocks.clear();
_agg_functions.clear();
@@ -180,6 +178,7 @@ int RowInBlockComparator::operator()(const RowInBlock*
left, const RowInBlock* r
Status MemTable::insert(const vectorized::Block* input_block,
const std::vector<uint32_t>& row_idxs) {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (_is_first_insertion) {
_is_first_insertion = false;
auto clone_block = input_block->clone_without_columns(&_column_offset);
@@ -214,10 +213,6 @@ Status MemTable::insert(const vectorized::Block*
input_block,
row_idxs.data() + num_rows,
&_column_offset));
auto block_size1 = _input_mutable_block.allocated_bytes();
g_memtable_input_block_allocated_size << block_size1 - block_size0;
- auto input_size = size_t(input_block->bytes() * num_rows /
input_block->rows() *
- config::memtable_insert_memory_ratio);
- _mem_usage += input_size;
- _insert_mem_tracker->consume(input_size);
for (int i = 0; i < num_rows; i++) {
_row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock +
i});
}
@@ -467,10 +462,6 @@ void MemTable::_aggregate() {
}
if constexpr (!is_final) {
// if is not final, we collect the agg results to input_block and then
continue to insert
- size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
- // flush will not run here, so will not duplicate `_flush_mem_tracker`
- _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage);
- _mem_usage = shrunked_after_agg;
_input_mutable_block.swap(_output_mutable_block);
//TODO(weixang):opt here.
std::unique_ptr<vectorized::Block> empty_input_block =
in_block.create_same_struct_block(0);
@@ -483,6 +474,7 @@ void MemTable::_aggregate() {
}
void MemTable::shrink_memtable_by_agg() {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (_keys_type == KeysType::DUP_KEYS) {
return;
}
@@ -532,8 +524,8 @@ Status
MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
}
g_memtable_input_block_allocated_size <<
-_input_mutable_block.allocated_bytes();
_input_mutable_block.clear();
- _insert_mem_tracker->release(_mem_usage);
- _mem_usage = 0;
+ // After to block, all data in arena is saved in the block
+ _arena.reset();
*res = vectorized::Block::create_unique(_output_mutable_block.to_block());
return Status::OK();
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 70f7a9f22a0..4ae92c2d2d8 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -47,6 +47,11 @@ class TabletSchema;
class TupleDescriptor;
enum KeysType : int;
+// Active: the memtable is currently used by writer to insert into blocks
+// Write_finished: the memtable finished write blocks and in the queue waiting
for flush
+// FLUSH: the memtable is under flushing, write segment to disk.
+enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 };
+
// row pos in _input_mutable_block
struct RowInBlock {
size_t _row_pos;
@@ -171,16 +176,11 @@ class MemTable {
public:
MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
- bool enable_unique_key_mow, PartialUpdateInfo*
partial_update_info,
- const std::shared_ptr<MemTracker>& insert_mem_tracker,
- const std::shared_ptr<MemTracker>& flush_mem_tracker);
+ bool enable_unique_key_mow, PartialUpdateInfo*
partial_update_info);
~MemTable();
int64_t tablet_id() const { return _tablet_id; }
- size_t memory_usage() const {
- return _insert_mem_tracker->consumption() + _arena->used_size() +
- _flush_mem_tracker->consumption();
- }
+ size_t memory_usage() const { return _mem_tracker->consumption(); }
// insert tuple from (row_pos) to (row_pos+num_rows)
Status insert(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
@@ -196,10 +196,16 @@ public:
const MemTableStat& stat() { return _stat; }
- std::shared_ptr<MemTracker> flush_mem_tracker() { return
_flush_mem_tracker; }
-
QueryThreadContext query_thread_context() { return _query_thread_context; }
+ std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
+
+ void set_flush_success() { _is_flush_success = true; }
+
+ MemType get_mem_type() { return _mem_type; }
+
+ void update_mem_type(MemType memtype) { _mem_type = memtype; }
+
private:
// for vectorized
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
RowInBlock* new_row,
@@ -209,9 +215,11 @@ private:
Status _to_block(std::unique_ptr<vectorized::Block>* res);
private:
+ std::atomic<MemType> _mem_type;
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
+ bool _is_flush_success = false;
const KeysType _keys_type;
std::shared_ptr<TabletSchema> _tablet_schema;
@@ -219,18 +227,11 @@ private:
QueryThreadContext _query_thread_context;
- // `_insert_manual_mem_tracker` manually records the memory value of
memtable insert()
- // `_flush_hook_mem_tracker` automatically records the memory value of
memtable flush() through mem hook.
- // Is used to flush when _insert_manual_mem_tracker larger than
write_buffer_size and run flush memtable
- // when the sum of all memtable (_insert_manual_mem_tracker +
_flush_hook_mem_tracker) exceeds the limit.
- std::shared_ptr<MemTracker> _insert_mem_tracker;
- std::shared_ptr<MemTracker> _flush_mem_tracker;
+ std::shared_ptr<MemTracker> _mem_tracker;
// Only the rows will be inserted into block can allocate memory from
_arena.
// In this way, we can make MemTable::memory_usage() to be more accurate,
and eventually
// reduce the number of segment files that are generated by current load
std::unique_ptr<vectorized::Arena> _arena;
- // The object buffer pool for convert tuple to row
- ObjectPool _agg_buffer_pool;
void _init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
const TupleDescriptor* tuple_desc);
@@ -264,8 +265,6 @@ private:
std::vector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
std::vector<RowInBlock*> _row_in_blocks;
- // Memory usage without _arena.
- size_t _mem_usage;
size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 887340eed70..dc911647be8 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -46,10 +46,10 @@ class MemtableFlushTask final : public Runnable {
ENABLE_FACTORY_CREATOR(MemtableFlushTask);
public:
- MemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
std::unique_ptr<MemTable> memtable,
+ MemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
std::shared_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
- _memtable(std::move(memtable)),
+ _memtable(memtable),
_segment_id(segment_id),
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
@@ -60,7 +60,7 @@ public:
void run() override {
auto token = _flush_token.lock();
if (token) {
- token->_flush_memtable(std::move(_memtable), _segment_id,
_submit_task_time);
+ token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
} else {
LOG(WARNING) << "flush token is deconstructed, ignore the flush
task";
}
@@ -68,7 +68,7 @@ public:
private:
std::weak_ptr<FlushToken> _flush_token;
- std::unique_ptr<MemTable> _memtable;
+ std::shared_ptr<MemTable> _memtable;
int32_t _segment_id;
int64_t _submit_task_time;
};
@@ -83,7 +83,7 @@ std::ostream& operator<<(std::ostream& os, const
FlushStatistic& stat) {
return os;
}
-Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
+Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
{
std::shared_lock rdlk(_flush_status_lock);
DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
@@ -98,9 +98,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable>
mem_table) {
return Status::OK();
}
int64_t submit_task_time = MonotonicNanos();
- auto task = MemtableFlushTask::create_shared(shared_from_this(),
std::move(mem_table),
-
_rowset_writer->allocate_segment_id(),
- submit_task_time);
+ auto task = MemtableFlushTask::create_shared(
+ shared_from_this(), mem_table,
_rowset_writer->allocate_segment_id(), submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no
need to notify _cond here
@@ -136,20 +135,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable,
int32_t segment_id, in
VLOG_CRITICAL << "begin to flush memtable for tablet: " <<
memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
<< ", rows: " << memtable->stat().raw_rows;
+ memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
SCOPED_ATTACH_TASK(memtable->query_thread_context());
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable->tablet_id();
{
+ SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
std::unique_ptr<vectorized::Block> block;
- // During to block method, it will release old memory and create new
block, so that
- // we could not scoped it.
RETURN_IF_ERROR(memtable->to_block(&block));
- memtable->flush_mem_tracker()->consume(block->allocated_bytes());
- SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(),
segment_id, flush_size));
}
+ memtable->set_flush_success();
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns /
1000);
@@ -158,7 +156,7 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable,
int32_t segment_id, in
return Status::OK();
}
-void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr,
int32_t segment_id,
+void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr,
int32_t segment_id,
int64_t submit_task_time) {
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 2d20298f800..25c5a37afba 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -61,7 +61,7 @@ class FlushToken : public
std::enable_shared_from_this<FlushToken> {
public:
FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()),
_thread_pool(thread_pool) {}
- Status submit(std::unique_ptr<MemTable> mem_table);
+ Status submit(std::shared_ptr<MemTable> mem_table);
// error has happens, so we cancel this token
// And remove all tasks in the queue.
@@ -87,7 +87,7 @@ private:
private:
friend class MemtableFlushTask;
- void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t
segment_id,
+ void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t
segment_id,
int64_t submit_task_time);
Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t*
flush_size);
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index ea045b1e53e..9b9ce19f895 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -20,6 +20,7 @@
#include <bvar/bvar.h>
#include "common/config.h"
+#include "olap/memtable.h"
#include "olap/memtable_writer.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
@@ -237,13 +238,14 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
_active_writers.clear();
for (auto it = _writers.begin(); it != _writers.end();) {
if (auto writer = it->lock()) {
+ // The memtable is currently used by writer to insert blocks.
auto active_usage = writer->active_memtable_mem_consumption();
_active_mem_usage += active_usage;
if (active_usage > 0) {
_active_writers.push_back(writer);
}
_flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
- _write_mem_usage += writer->mem_consumption(MemType::WRITE);
+ _write_mem_usage +=
writer->mem_consumption(MemType::WRITE_FINISHED);
++it;
} else {
*it = std::move(_writers.back());
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 59916d5f1cc..e8123c48ecc 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -133,12 +133,18 @@ Status MemTableWriter::write(const vectorized::Block*
block,
Status MemTableWriter::_flush_memtable_async() {
DCHECK(_flush_token != nullptr);
- std::unique_ptr<MemTable> memtable;
+ std::shared_ptr<MemTable> memtable;
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
- memtable = std::move(_mem_table);
+ memtable = _mem_table;
+ _mem_table = nullptr;
}
- return _flush_token->submit(std::move(memtable));
+ {
+ std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ memtable->update_mem_type(MemType::WRITE_FINISHED);
+ _freezed_mem_tables.push_back(memtable);
+ }
+ return _flush_token->submit(memtable);
}
Status MemTableWriter::flush_async() {
@@ -187,22 +193,10 @@ Status MemTableWriter::wait_flush() {
}
void MemTableWriter::_reset_mem_table() {
- auto mem_table_insert_tracker = std::make_shared<MemTracker>(fmt::format(
- "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
- std::to_string(tablet_id()), _mem_table_num,
UniqueId(_req.load_id).to_string()));
- auto mem_table_flush_tracker = std::make_shared<MemTracker>(fmt::format(
- "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()),
- _mem_table_num++, UniqueId(_req.load_id).to_string()));
- {
- std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
- _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
- _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
- }
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema,
_req.slots, _req.tuple_desc,
- _unique_key_mow,
_partial_update_info.get(),
- mem_table_insert_tracker,
mem_table_flush_tracker));
+ _unique_key_mow,
_partial_update_info.get()));
}
_segment_num++;
@@ -353,15 +347,11 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
}
int64_t mem_usage = 0;
{
- std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
- if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
- for (const auto& mem_table_tracker : _mem_table_insert_trackers) {
- mem_usage += mem_table_tracker->consumption();
- }
- }
- if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
- for (const auto& mem_table_tracker : _mem_table_flush_trackers) {
- mem_usage += mem_table_tracker->consumption();
+ std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ for (const auto& mem_table : _freezed_mem_tables) {
+ auto mem_table_sptr = mem_table.lock();
+ if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() ==
mem) {
+ mem_usage += mem_table_sptr->memory_usage();
}
}
}
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ee7c8e1538a..ec44348b4a9 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -57,8 +57,6 @@ namespace vectorized {
class Block;
} // namespace vectorized
-enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
-
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class MemTableWriter {
@@ -123,18 +121,17 @@ private:
Status _cancel_status;
WriteRequest _req;
std::shared_ptr<RowsetWriter> _rowset_writer;
- std::unique_ptr<MemTable> _mem_table;
+ std::shared_ptr<MemTable> _mem_table;
TabletSchemaSPtr _tablet_schema;
bool _unique_key_mow = false;
// This variable is accessed from writer thread and token flush thread
// use a shared ptr to avoid use after free problem.
std::shared_ptr<FlushToken> _flush_token;
- std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
- std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
- SpinLock _mem_table_tracker_lock;
+ // Save the not active memtable that is in flush queue or under flushing.
+ std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables;
+ // The lock to protect _memtable and _freezed_mem_tables structure to
avoid concurrency modification or read
SpinLock _mem_table_ptr_lock;
- std::atomic<uint32_t> _mem_table_num = 1;
QueryThreadContext _query_thread_context;
std::mutex _lock;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index f0710a5e2ba..11457a7a332 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -450,25 +450,12 @@ Status Segment::_load_pk_bloom_filter() {
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
DCHECK(_pk_index_meta != nullptr);
DCHECK(_pk_index_reader != nullptr);
- auto status = [this]() {
- return _load_pk_bf_once.call([this] {
- RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader,
*_pk_index_meta));
- // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
- return Status::OK();
- });
- }();
- if (!status.ok()) {
- remove_from_segment_cache();
- }
- return status;
-}
-void Segment::remove_from_segment_cache() const {
- if (config::disable_segment_cache) {
- return;
- }
- SegmentCache::CacheKey cache_key(_rowset_id, _segment_id);
- SegmentLoader::instance()->erase_segment(cache_key);
+ return _load_pk_bf_once.call([this] {
+ RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader,
*_pk_index_meta));
+ // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
+ return Status::OK();
+ });
}
Status Segment::load_pk_index_and_bf() {
@@ -478,14 +465,6 @@ Status Segment::load_pk_index_and_bf() {
}
Status Segment::load_index() {
- auto status = [this]() { return _load_index_impl(); }();
- if (!status.ok()) {
- remove_from_segment_cache();
- }
- return status;
-}
-
-Status Segment::_load_index_impl() {
return _load_index_once.call([this] {
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta !=
nullptr) {
_pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
@@ -519,6 +498,32 @@ Status Segment::_load_index_impl() {
});
}
+Status Segment::healthy_status() {
+ try {
+ if (_load_index_once.has_called()) {
+ RETURN_IF_ERROR(_load_index_once.stored_result());
+ }
+ if (_load_pk_bf_once.has_called()) {
+ RETURN_IF_ERROR(_load_pk_bf_once.stored_result());
+ }
+ if (_create_column_readers_once_call.has_called()) {
+ RETURN_IF_ERROR(_create_column_readers_once_call.stored_result());
+ }
+ if (_inverted_index_file_reader_open.has_called()) {
+ RETURN_IF_ERROR(_inverted_index_file_reader_open.stored_result());
+ }
+ // This status is set by running time, for example, if there is
something wrong during read segment iterator.
+ return _healthy_status.status();
+ } catch (const doris::Exception& e) {
+ // If there is an exception during load_xxx, should not throw
exception directly because
+ // the caller may not exception safe.
+ return e.to_status();
+ } catch (const std::exception& e) {
+ // The exception is not thrown by doris code.
+ return Status::InternalError("Unexcepted error during load segment:
{}", e.what());
+ }
+}
+
// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier&
identifier,
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index a4f01873f4c..035860b9bc9 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -142,6 +142,12 @@ public:
Status load_pk_index_and_bf();
+ void update_healthy_status(Status new_status) {
_healthy_status.update(new_status); }
+ // The segment is loaded into SegmentCache and then will load indices, if
there are something wrong
+ // during loading indices, should remove it from SegmentCache. If not, it
will always report error during
+ // query. So we add a healthy status API, the caller should check the
healhty status before using the segment.
+ Status healthy_status();
+
std::string min_key() {
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta !=
nullptr);
return _pk_index_meta->min_key();
@@ -155,8 +161,6 @@ public:
int64_t meta_mem_usage() const { return _meta_mem_usage; }
- void remove_from_segment_cache() const;
-
// Identify the column by unique id or path info
struct ColumnIdentifier {
int32_t unique_id = -1;
@@ -222,7 +226,6 @@ private:
Status _write_error_file(size_t file_size, size_t offset, size_t
bytes_read, char* data,
io::IOContext& io_ctx);
- Status _load_index_impl();
Status _open_inverted_index();
Status _create_column_readers_once();
@@ -233,6 +236,7 @@ private:
io::FileReaderSPtr _file_reader;
uint32_t _segment_id;
uint32_t _num_rows;
+ AtomicStatus _healthy_status;
// 1. Tracking memory use by segment meta data such as footer or index
page.
// 2. Tracking memory use by segment column reader
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 3d8e06bbc00..04ec5830d28 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -270,8 +270,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment>
segment, SchemaSPtr sc
Status SegmentIterator::init(const StorageReadOptions& opts) {
auto status = _init_impl(opts);
- if (!status.ok() && !config::disable_segment_cache) {
- _segment->remove_from_segment_cache();
+ if (!status.ok()) {
+ _segment->update_healthy_status(status);
}
return status;
}
@@ -1925,7 +1925,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
// if rows read by batch is 0, will return end of file, we should not
remove segment cache in this situation.
if (!status.ok() && !status.is<END_OF_FILE>()) {
- _segment->remove_from_segment_cache();
+ _segment->update_healthy_status(status);
}
return status;
}
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index fd7e3f476ad..abc82c6f3ee 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -59,8 +59,14 @@ Status SegmentLoader::load_segments(const
BetaRowsetSharedPtr& rowset,
for (int64_t i = 0; i < rowset->num_segments(); i++) {
SegmentCache::CacheKey cache_key(rowset->rowset_id(), i);
if (_segment_cache->lookup(cache_key, cache_handle)) {
- continue;
+ // Has to check the segment status here, because the segment in
cache may has something wrong during
+ // load index or create column reader.
+ // Not merge this if logic with previous to make the logic more
clear.
+ if (cache_handle->pop_unhealthy_segment() == nullptr) {
+ continue;
+ }
}
+ // If the segment is not healthy, then will create a new segment and
will replace the unhealthy one in SegmentCache.
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
if (need_load_pk_index_and_bf) {
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index d177024242d..b3b88fa7700 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -161,6 +161,18 @@ public:
_init = true;
}
+ segment_v2::SegmentSharedPtr pop_unhealthy_segment() {
+ if (segments.empty()) {
+ return nullptr;
+ }
+ segment_v2::SegmentSharedPtr last_segment = segments.back();
+ if (last_segment->healthy_status().ok()) {
+ return nullptr;
+ }
+ segments.pop_back();
+ return last_segment;
+ }
+
private:
std::vector<segment_v2::SegmentSharedPtr> segments;
bool _init {false};
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index f8c11639719..1ac7753b197 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
- _need_release_memtracker = true;
}
}
}
@@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() {
rows_str << ", index id: " << entry.first << ", total_received_rows: "
<< entry.second.first
<< ", num_rows_filtered: " << entry.second.second;
}
- if (_need_release_memtracker) {
- WorkloadGroupPtr wg_ptr =
_query_thread_context.get_workload_group_ptr();
- if (wg_ptr) {
-
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
- }
- }
LOG(INFO) << "load channel removed"
<< " load_id=" << _load_id << ", is high priority=" <<
_is_high_priority
<< ", sender_ip=" << _sender_ip << rows_str.str();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 6fad8c536ec..6c150ed74d9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -127,7 +127,6 @@ private:
int64_t _backend_id;
bool _enable_profile;
- bool _need_release_memtracker = false;
};
inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index faf354cca4c..251a7c25a74 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -123,9 +123,6 @@ public:
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) {
_is_query_cancelled.store(is_cancelled); }
- // Iterator into mem_tracker_limiter_pool for this object. Stored to have
O(1) remove.
- std::list<std::weak_ptr<MemTrackerLimiter>>::iterator
wg_tracker_limiter_group_it;
-
/*
* Part 3, Memory tracking method (use carefully!)
*
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 497041ac17b..046de58fe5f 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -159,8 +159,6 @@ QueryContext::~QueryContext() {
uint64_t group_id = 0;
if (_workload_group) {
group_id = _workload_group->id(); // before remove
- _workload_group->remove_mem_tracker_limiter(query_mem_tracker);
- _workload_group->remove_query(_query_id);
}
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 329366766f8..4d458cd440f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -446,7 +446,7 @@ void BaseTabletsChannel::refresh_profile() {
{
std::lock_guard<SpinLock> l(_tablet_writers_lock);
for (auto&& [tablet_id, writer] : _tablet_writers) {
- int64_t write_mem = writer->mem_consumption(MemType::WRITE);
+ int64_t write_mem =
writer->mem_consumption(MemType::WRITE_FINISHED);
write_mem_usage += write_mem;
int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
flush_mem_usage += flush_mem;
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 6f3b51f09fd..0488e9ec83c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& tg_info) {
}
}
+// MemtrackerLimiter is not removed during query context release, so that
should remove it here.
int64_t WorkloadGroup::make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
int64_t used_memory = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
- for (const auto& trackerWptr : mem_tracker_group.trackers) {
- auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- if (tracker_snapshots != nullptr) {
- tracker_snapshots->insert(tracker_snapshots->end(), tracker);
+ for (auto trackerWptr = mem_tracker_group.trackers.begin();
+ trackerWptr != mem_tracker_group.trackers.end();) {
+ auto tracker = trackerWptr->lock();
+ if (tracker == nullptr) {
+ trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+ } else {
+ if (tracker_snapshots != nullptr) {
+ tracker_snapshots->insert(tracker_snapshots->end(),
tracker);
+ }
+ used_memory += tracker->consumption();
+ ++trackerWptr;
}
- used_memory += tracker->consumption();
}
}
- refresh_memory(used_memory);
+ // refresh total memory used.
+ _total_mem_used = used_memory;
+ // reserve memory is recorded in the query mem tracker
+ // and _total_mem_used already contains all the current reserve memory.
+ // so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
+ _wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
return used_memory;
}
@@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}
-void WorkloadGroup::refresh_memory(int64_t used_memory) {
- // refresh total memory used.
- _total_mem_used = used_memory;
- // reserve memory is recorded in the query mem tracker
- // and _total_mem_used already contains all the current reserve memory.
- // so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
- _wg_refresh_interval_memory_growth.store(0.0);
-}
+void WorkloadGroup::do_sweep() {
+ // Clear memtracker limiter that is registered during query or load.
+ for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
+ std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+ for (auto trackerWptr = mem_tracker_group.trackers.begin();
+ trackerWptr != mem_tracker_group.trackers.end();) {
+ auto tracker = trackerWptr->lock();
+ if (tracker == nullptr) {
+ trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+ } else {
+ ++trackerWptr;
+ }
+ }
+ }
-void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
+ // Clear query context that is registered during query context ctor
std::unique_lock<std::shared_mutex> wlock(_mutex);
- auto group_num = mem_tracker_ptr->group_num();
- std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
- mem_tracker_ptr->wg_tracker_limiter_group_it =
- _mem_tracker_limiter_pool[group_num].trackers.insert(
- _mem_tracker_limiter_pool[group_num].trackers.end(),
mem_tracker_ptr);
+ for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) {
+ if (iter->second.lock() == nullptr) {
+ iter = _query_ctxs.erase(iter);
+ } else {
+ iter++;
+ }
+ }
}
-void
WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
+void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
- if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
- _mem_tracker_limiter_pool[group_num].trackers.end()) {
- _mem_tracker_limiter_pool[group_num].trackers.erase(
- mem_tracker_ptr->wg_tracker_limiter_group_it);
- mem_tracker_ptr->wg_tracker_limiter_group_it =
- _mem_tracker_limiter_pool[group_num].trackers.end();
- }
+ _mem_tracker_limiter_pool[group_num].trackers.insert(
+ _mem_tracker_limiter_pool[group_num].trackers.end(),
mem_tracker_ptr);
}
int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile*
profile, bool is_minor_gc) {
@@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem,
RuntimeProfile* profile,
auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
const std::string& label) {
return fmt::format(
- "{} cancel top memory overcommit tracker <{}> consumption {}.
details:{}, Execute "
+ "{} cancel top memory overcommit tracker <{}> consumption {}.
details:{}, "
+ "Execute "
"again after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
};
auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const
std::string& label) {
return fmt::format(
- "{} cancel top memory used tracker <{}> consumption {}.
details:{}, Execute again "
+ "{} cancel top memory used tracker <{}> consumption {}.
details:{}, Execute "
+ "again "
"after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
@@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem,
RuntimeProfile* profile,
_id, _name, _memory_limit, used_memory, need_free_mem);
Defer defer {[&]() {
LOG(INFO) << fmt::format(
- "[MemoryGC] work load group finished gc, id:{} name:{}, memory
limit: {}, used: "
+ "[MemoryGC] work load group finished gc, id:{} name:{}, memory
limit: {}, "
+ "used: "
"{}, need_free_mem: {}, freed memory: {}.",
_id, _name, _memory_limit, used_memory, need_free_mem,
freed_mem);
}};
@@ -542,7 +559,8 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
- LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is illegal: "
+ LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is "
+ "illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;
}
} else {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 2fbb4dd3030..933c5afdb4e 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -89,7 +89,8 @@ public:
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
// call make_memory_tracker_snapshots, so also refresh total memory used.
int64_t memory_used();
- void refresh_memory(int64_t used_memory);
+
+ void do_sweep();
int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
@@ -132,8 +133,6 @@ public:
void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
- void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
-
// when mem_limit <=0 , it's an invalid value, then current group not
participating in memory GC
// because mem_limit is not a required property
bool is_mem_limit_valid() {
@@ -154,11 +153,6 @@ public:
return Status::OK();
}
- void remove_query(TUniqueId query_id) {
- std::unique_lock<std::shared_mutex> wlock(_mutex);
- _query_ctxs.erase(query_id);
- }
-
void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
@@ -169,11 +163,6 @@ public:
return _is_shutdown && _query_ctxs.empty();
}
- int query_num() {
- std::shared_lock<std::shared_mutex> r_lock(_mutex);
- return _query_ctxs.size();
- }
-
int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool
is_minor_gc);
void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 65a8e3685c8..003f07f1db0 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -136,6 +136,13 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
<< ", before wg size=" << old_wg_size << ", after wg size=" <<
new_wg_size;
}
+void WorkloadGroupMgr::do_sweep() {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (auto& [wg_id, wg] : _workload_groups) {
+ wg->do_sweep();
+ }
+}
+
struct WorkloadGroupMemInfo {
int64_t total_mem_used = 0;
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index d8547c3383e..f76e98d2606 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -50,6 +50,8 @@ public:
WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
+ void do_sweep();
+
void stop();
std::atomic<bool> _enable_cpu_hard_limit = false;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 06dad46e90c..8dc4d7bb3c2 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1669,17 +1669,13 @@ void
PInternalService::reset_rpc_channel(google::protobuf::RpcController* contro
void PInternalService::hand_shake(google::protobuf::RpcController* controller,
const PHandShakeRequest* request,
PHandShakeResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([request, response, done]() {
- brpc::ClosureGuard closure_guard(done);
- if (request->has_hello()) {
- response->set_hello(request->hello());
- }
- response->mutable_status()->set_status_code(0);
- });
- if (!ret) {
- offer_failed(response, done, _light_work_pool);
- return;
+ // The light pool may be full. Handshake is used to check the connection
state of brpc.
+ // Should not be interfered by the thread pool logic.
+ brpc::ClosureGuard closure_guard(done);
+ if (request->has_hello()) {
+ response->set_hello(request->hello());
}
+ response->mutable_status()->set_status_code(0);
}
constexpr char HttpProtocol[] = "http://";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]