This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0075df54ddb [workloadgroup](memory) flush memtable when memory is not
enough (#54642)
0075df54ddb is described below
commit 0075df54ddb6073ddad59a844881d688c5471182
Author: yiguolei <[email protected]>
AuthorDate: Mon Aug 18 15:11:27 2025 +0800
[workloadgroup](memory) flush memtable when memory is not enough (#54642)
### What problem does this PR solve?
1. flush memtable when memory is not enough
2. cancel the query that use more memory than min_memory_percent
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 1 +
be/src/common/daemon.cpp | 2 +-
...chema_workload_group_resource_usage_scanner.cpp | 1 -
be/src/olap/memtable_memory_limiter.cpp | 79 +---
be/src/olap/memtable_memory_limiter.h | 21 +-
be/src/runtime/load_channel_mgr.cpp | 4 +-
be/src/runtime/memory/global_memory_arbitrator.cpp | 1 -
be/src/runtime/memory/global_memory_arbitrator.h | 1 -
be/src/runtime/memory/mem_tracker_limiter.h | 2 -
be/src/runtime/query_context.cpp | 2 +-
be/src/runtime/workload_group/workload_group.cpp | 50 ++-
be/src/runtime/workload_group/workload_group.h | 29 +-
.../workload_group/workload_group_manager.cpp | 475 ++++++++-------------
.../workload_group/workload_group_manager.h | 16 +-
.../runtime/workload_management/memory_context.h | 3 +-
.../workload_management/query_task_controller.cpp | 5 +-
.../workload_management/query_task_controller.h | 2 +-
.../runtime/workload_management/task_controller.h | 2 +
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +-
be/test/olap/memtable_memory_limiter_test.cpp | 2 +-
.../workload_group/workload_group_manager_test.cpp | 193 ++++++---
.../java/org/apache/doris/catalog/SchemaTable.java | 1 -
.../resource/workloadgroup/WorkloadGroup.java | 38 +-
.../resource/workloadgroup/WorkloadGroupMgr.java | 1 -
25 files changed, 389 insertions(+), 548 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5ab1d344b44..c7eb4136c52 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1391,6 +1391,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
// paused query in queue timeout(ms) will be resumed or canceled
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");
+DEFINE_Int64(wait_cancel_release_memory_ms, "5000");
+
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
DEFINE_mBool(force_azure_blob_global_endpoint, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4595f416c49..5f5900165b0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1447,6 +1447,7 @@ DECLARE_mInt32(spill_gc_work_time_ms);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int64(spill_in_paused_queue_timeout_ms);
+DECLARE_Int64(wait_cancel_release_memory_ms);
DECLARE_mBool(check_segment_when_build_rowset_meta);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 4651d1de36e..a5f01ed1975 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -348,7 +348,7 @@ void Daemon::memory_maintenance_thread() {
// step 6. Refresh weighted memory ratio of workload groups.
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
-
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
+
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_memory_state();
// step 7: handle paused queries(caused by memory insufficient)
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();
diff --git
a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
index b89823f78d2..bae6d2dd51d 100644
---
a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
+++
b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
@@ -38,7 +38,6 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaBackendWorkloadGroupResourceUsage::
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
- {"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
};
SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index a55b7fe6f59..c8caf5194d6 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -119,43 +119,11 @@ int64_t MemTableMemoryLimiter::_need_flush() {
return need_flush - _queue_mem_usage - _flush_mem_usage;
}
-void MemTableMemoryLimiter::handle_workload_group_memtable_flush(
- WorkloadGroupPtr wg, std::function<bool()> cancel_check) {
- // It means some query is pending on here to flush memtable and to
continue running.
- // So that should wait here.
- // Wait at most 3s, because this code is not aware cancel flag. If the
load task is cancelled
- // Should releae memory quickly.
- using namespace std::chrono_literals;
- int32_t max_sleep_times = 30;
- int32_t sleep_times = max_sleep_times;
- MonotonicStopWatch timer;
- timer.start();
- while (wg != nullptr && wg->enable_write_buffer_limit() &&
wg->exceed_write_buffer_limit() &&
- sleep_times > 0) {
- if (cancel_check && cancel_check()) {
- LOG(INFO) << "cancelled when waiting for memtable flush, wg: "
- << (wg == nullptr ? "null" : wg->debug_string());
- return;
- }
- std::this_thread::sleep_for(100ms);
- --sleep_times;
- }
- if (sleep_times < max_sleep_times) {
- timer.stop();
- VLOG_DEBUG << "handle_workload_group_memtable_flush waited "
- << PrettyPrinter::print(timer.elapsed_time(),
TUnit::TIME_NS)
- << ", wg: " << wg->debug_string();
- }
- // Check process memory again.
- _handle_memtable_flush(wg, cancel_check);
-}
-
-void MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
- std::function<bool()>
cancel_check) {
+void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()>
cancel_check) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
do {
-
DBUG_EXECUTE_IF("MemTableMemoryLimiter._handle_memtable_flush.limit_reached", {
+
DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", {
LOG(INFO) << "debug memtable limit reached";
break;
});
@@ -176,8 +144,7 @@ void
MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
}
}
if (cancel_check && cancel_check()) {
- LOG(INFO) << "cancelled when waiting for memtable flush, wg: "
- << (wg == nullptr ? "null" : wg->debug_string());
+ LOG(INFO) << "cancelled when waiting for memtable flush";
return;
}
first = false;
@@ -192,15 +159,14 @@ void
MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
<< ", active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " <<
PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage)
- << ", need flush: " <<
PrettyPrinter::print_bytes(need_flush)
- << ", wg: " << (wg ? wg->debug_string() : "null");
+ << ", need flush: " <<
PrettyPrinter::print_bytes(need_flush);
if (VLOG_DEBUG_IS_ON) {
auto log_str = doris::ProcessProfile::instance()
->memory_profile()
->process_memory_detail_str();
LOG_LONG_STRING(INFO, log_str);
}
- _flush_active_memtables(0, need_flush);
+ _flush_active_memtables(need_flush);
}
} while (_hard_limit_reached() && !_load_usage_low());
g_memtable_memory_limit_waiting_threads << -1;
@@ -216,37 +182,11 @@ void
MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
<< ", memtable writers num: " << _writers.size()
<< ", active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " <<
PrettyPrinter::print_bytes(_queue_mem_usage)
- << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage)
- << ", wg: " << (wg ? wg->debug_string() : "null.");
- }
-}
-
-int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id,
int64_t need_flush) {
- std::unique_lock<std::mutex> l(_lock);
- return _flush_active_memtables(wg_id, need_flush);
-}
-
-void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id,
int64_t* active_bytes,
- int64_t*
queue_bytes,
- int64_t*
flush_bytes) {
- std::unique_lock<std::mutex> l(_lock);
- *active_bytes = 0;
- *queue_bytes = 0;
- *flush_bytes = 0;
- for (auto it = _writers.begin(); it != _writers.end(); ++it) {
- if (auto writer = it->lock()) {
- // If wg id is specified, but wg id not match, then not need flush
- if (writer->workload_group_id() != wg_id) {
- continue;
- }
- *active_bytes += writer->active_memtable_mem_consumption();
- *queue_bytes += writer->mem_consumption(MemType::WRITE_FINISHED);
- *flush_bytes += writer->mem_consumption(MemType::FLUSH);
- }
+ << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage);
}
}
-int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t
need_flush) {
+int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
if (need_flush <= 0) {
return 0;
}
@@ -278,10 +218,7 @@ int64_t
MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t n
if (w == nullptr) {
continue;
}
- // If wg id is specified, but wg id not match, then not need flush
- if (wg_id != 0 && w->workload_group_id() != wg_id) {
- continue;
- }
+
int64_t mem = w->active_memtable_mem_consumption();
if (mem < sort_mem * 0.9) {
// if the memtable writer just got flushed, don't flush it again
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index 038a484273c..34dcb2b06b4 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -40,13 +40,11 @@ public:
Status init(int64_t process_mem_limit);
- void handle_workload_group_memtable_flush(WorkloadGroupPtr wg,
- std::function<bool()>
cancel_check);
-
- int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t
need_flush_bytes);
-
- void get_workload_group_memtable_usage(uint64_t wg_id, int64_t*
active_bytes,
- int64_t* queue_bytes, int64_t*
flush_bytes);
+ // check if the total mem consumption exceeds limit.
+ // If yes, it will flush memtable to try to reduce memory consumption.
+ // Every write operation will call this API to check if need flush
memtable OR hang
+ // when memory is not available.
+ void handle_memtable_flush(std::function<bool()> cancel_check);
void register_writer(std::weak_ptr<MemTableWriter> writer);
@@ -57,12 +55,6 @@ public:
int64_t mem_usage() const { return _mem_usage; }
private:
- // check if the total mem consumption exceeds limit.
- // If yes, it will flush memtable to try to reduce memory consumption.
- // Every write operation will call this API to check if need flush
memtable OR hang
- // when memory is not available.
- void _handle_memtable_flush(WorkloadGroupPtr wg, std::function<bool()>
cancel_check);
-
static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
static inline int64_t _process_used_mem_more_than_soft_mem_limit();
@@ -70,9 +62,8 @@ private:
bool _hard_limit_reached();
bool _load_usage_low();
int64_t _need_flush();
- int64_t _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
+ int64_t _flush_active_memtables(int64_t need_flush);
void _refresh_mem_tracker();
-
std::mutex _lock;
std::condition_variable _hard_limit_end_cond;
int64_t _mem_usage = 0;
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 242e43eabab..0bb352d2feb 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -152,8 +152,8 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
- channel->workload_group(), [channel]() { return
channel->is_cancelled(); });
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+ [channel]() { return channel->is_cancelled(); });
if (channel->is_cancelled()) {
return Status::Cancelled("LoadChannel has been cancelled: {}.",
load_id.to_string());
}
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 5aed0721fff..8527e719882 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -62,7 +62,6 @@ std::atomic<double>
GlobalMemoryArbitrator::last_periodic_refreshed_cache_capaci
std::atomic<double>
GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted {1};
// The value that take affect
std::atomic<double>
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
-std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit
{false};
std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify
{false};
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
index 7ac98b1c1f2..ae685f3fe36 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -166,7 +166,6 @@ public:
static std::atomic<double>
last_memory_exceeded_cache_capacity_adjust_weighted;
// The value that take affect
static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
- static std::atomic<bool> any_workload_group_exceed_limit;
static void notify_cache_adjust_capacity() {
cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 7dd66eafe1b..da9285e5255 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -220,8 +220,6 @@ public:
static void make_top_consumption_tasks_tracker_profile(RuntimeProfile*
profile, int top_num);
static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
- int64_t write_buffer_size() const { return _write_tracker->consumption(); }
-
std::shared_ptr<MemTrackerLimiter> write_tracker() { return
_write_tracker; }
void print_log_usage(const std::string& msg);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c9955c25420..ece5ebb25fc 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -181,7 +181,7 @@ void QueryContext::_init_resource_context() {
}
void QueryContext::init_query_task_controller() {
- _resource_ctx->set_task_controller(QueryTaskController::create(this));
+
_resource_ctx->set_task_controller(QueryTaskController::create(shared_from_this()));
_resource_ctx->task_controller()->set_task_id(_query_id);
_resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
_resource_ctx->task_controller()->set_query_type(_query_options.query_type);
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index e0e58ba9c41..5cf23ca4c41 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -56,7 +56,6 @@ const static int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 80;
const static int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 95;
// This is a invalid value, and should ignore this value during usage
const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;
-const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info)
: _id(wg_info.id),
@@ -69,7 +68,6 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info)
_max_memory_percent(wg_info.max_memory_percent),
_memory_low_watermark(wg_info.memory_low_watermark),
_memory_high_watermark(wg_info.memory_high_watermark),
- _load_buffer_ratio(wg_info.write_buffer_ratio),
_scan_thread_num(wg_info.scan_thread_num),
_max_remote_scan_thread_num(wg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(wg_info.min_remote_scan_thread_num),
@@ -82,6 +80,10 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo&
wg_info)
_scan_io_throttle_map[data_dir.path] =
std::make_shared<IOThrottle>(data_dir.metric_name);
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>();
+ if (_max_memory_percent > 0) {
+ _min_memory_limit = static_cast<int64_t>(
+ static_cast<double>(_memory_limit * _min_memory_percent) /
_max_memory_percent);
+ }
_wg_metrics = std::make_shared<WorkloadGroupMetrics>(this);
}
@@ -125,20 +127,15 @@ std::string WorkloadGroup::_memory_debug_string() const {
auto mem_used_ratio_int = (int64_t)(mem_used_ratio * 100 + 0.5);
mem_used_ratio = (double)mem_used_ratio_int / 100;
return fmt::format(
- "min_memory_percent = {}% , max_memory_percent = {}% ,
memory_limit = {}B, "
+ "min_memory_percent = {}% , max_memory_percent = {}% ,
memory_limit = {}B, " // add a blackspace after % to avoid log4j format bugs
"slot_memory_policy = {}, total_query_slot_count = {}, "
"memory_low_watermark = {}, memory_high_watermark = {}, "
- "enable_write_buffer_limit = {}, write_buffer_ratio = {}% , " //
add a blackspace after % to avoid log4j format bugs
- "write_buffer_limit = {}, "
- "mem_used_ratio = {}, total_mem_used = {}(write_buffer_size = {}, "
+ "mem_used_ratio = {}, total_mem_used = {}, "
"wg_refresh_interval_memory_growth = {}",
_min_memory_percent, _max_memory_percent,
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
to_string(_slot_mem_policy),
- _total_query_slot_count, _memory_low_watermark,
_memory_high_watermark,
- _enable_write_buffer_limit, _load_buffer_ratio,
- PrettyPrinter::print(write_buffer_limit(), TUnit::BYTES),
mem_used_ratio,
+ _total_query_slot_count, _memory_low_watermark,
_memory_high_watermark, mem_used_ratio,
PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
- PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(),
TUnit::BYTES));
}
@@ -177,8 +174,12 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& wg_info) {
_scan_bytes_per_second = wg_info.read_bytes_per_second;
_remote_scan_bytes_per_second =
wg_info.remote_read_bytes_per_second;
_total_query_slot_count = wg_info.total_query_slot_count;
- _load_buffer_ratio = wg_info.write_buffer_ratio;
_slot_mem_policy = wg_info.slot_mem_policy;
+ if (_max_memory_percent > 0) {
+ _min_memory_limit = static_cast<int64_t>(
+ static_cast<double>(_memory_limit *
_min_memory_percent) /
+ _max_memory_percent);
+ }
} else {
return;
}
@@ -188,7 +189,6 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& wg_info) {
// MemtrackerLimiter is not removed during query context release, so that
should remove it here.
int64_t WorkloadGroup::refresh_memory_usage() {
int64_t fragment_used_memory = 0;
- int64_t write_buffer_size = 0;
{
std::shared_lock<std::shared_mutex> r_lock(_mutex);
for (const auto& pair : _resource_ctxs) {
@@ -198,13 +198,11 @@ int64_t WorkloadGroup::refresh_memory_usage() {
}
DCHECK(resource_ctx->memory_context()->mem_tracker() != nullptr);
fragment_used_memory +=
resource_ctx->memory_context()->current_memory_bytes();
- write_buffer_size +=
resource_ctx->memory_context()->mem_tracker()->write_buffer_size();
}
}
- _total_mem_used = fragment_used_memory + write_buffer_size;
+ _total_mem_used = fragment_used_memory;
_wg_metrics->update_memory_used_bytes(_total_mem_used);
- _write_buffer_size = write_buffer_size;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
@@ -228,6 +226,21 @@ void WorkloadGroup::do_sweep() {
}
}
+#ifdef BE_TEST
+void WorkloadGroup::clear_cancelled_resource_ctx() {
+ // Clear resource context that is registered during add_resource_ctx
+ std::unique_lock<std::shared_mutex> wlock(_mutex);
+ for (auto iter = _resource_ctxs.begin(); iter != _resource_ctxs.end();) {
+ auto ctx = iter->second.lock();
+ if (ctx != nullptr && ctx->task_controller()->is_cancelled()) {
+ iter = _resource_ctxs.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+}
+#endif
+
int64_t WorkloadGroup::revoke_memory(int64_t need_free_mem, const std::string&
revoke_reason,
RuntimeProfile* profile) {
if (need_free_mem <= 0) {
@@ -447,12 +460,6 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
total_query_slot_count = tworkload_group_info.total_query_slot_count;
}
- // 17 load buffer memory limit
- int write_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE;
- if (tworkload_group_info.__isset.write_buffer_ratio) {
- write_buffer_ratio = tworkload_group_info.write_buffer_ratio;
- }
-
// 18 slot memory policy
TWgSlotMemoryPolicy::type slot_mem_policy = TWgSlotMemoryPolicy::NONE;
if (tworkload_group_info.__isset.slot_memory_policy) {
@@ -476,7 +483,6 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.remote_read_bytes_per_second = remote_read_bytes_per_second,
.total_query_slot_count = total_query_slot_count,
.slot_mem_policy = slot_mem_policy,
- .write_buffer_ratio = write_buffer_ratio,
.pipeline_exec_thread_num = exec_thread_num,
.max_flush_thread_num = max_flush_thread_num,
.min_flush_thread_num = min_flush_thread_num};
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 800c92d05c3..2677596b82b 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -61,7 +61,7 @@ class WorkloadGroup : public
std::enable_shared_from_this<WorkloadGroup> {
ENABLE_FACTORY_CREATOR(WorkloadGroup);
public:
- explicit WorkloadGroup(const WorkloadGroupInfo& wg_info);
+ WorkloadGroup(const WorkloadGroupInfo& wg_info);
virtual ~WorkloadGroup();
@@ -77,19 +77,14 @@ public:
int64_t total_mem_used() const { return _total_mem_used; }
- int64_t write_buffer_size() const { return _write_buffer_size; }
-
- void enable_write_buffer_limit(bool enable_limit) {
_enable_write_buffer_limit = enable_limit; }
-
- bool enable_write_buffer_limit() const { return
_enable_write_buffer_limit; }
-
- bool exceed_write_buffer_limit() const { return _write_buffer_size >
write_buffer_limit(); }
-
// make memory snapshots and refresh total memory used at the same time.
int64_t refresh_memory_usage();
int64_t memory_used();
void do_sweep();
+#ifdef BE_TEST
+ void clear_cancelled_resource_ctx();
+#endif
int memory_low_watermark() const {
return _memory_low_watermark.load(std::memory_order_relaxed);
@@ -99,8 +94,6 @@ public:
return _memory_high_watermark.load(std::memory_order_relaxed);
}
- void set_weighted_memory_ratio(double ratio);
-
int total_query_slot_count() const {
return _total_query_slot_count.load(std::memory_order_relaxed);
}
@@ -141,6 +134,8 @@ public:
return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
}
+ int64_t min_memory_limit() const { return _min_memory_limit; }
+
Status add_resource_ctx(TUniqueId query_id,
std::shared_ptr<ResourceContext> resource_ctx) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
@@ -207,8 +202,6 @@ public:
friend class WorkloadGroupMetrics;
friend class WorkloadGroupMgr;
- int64_t write_buffer_limit() const { return _memory_limit *
_load_buffer_ratio / 100; }
-
int64_t revoke_memory(int64_t need_free_mem, const std::string&
revoke_reason,
RuntimeProfile* profile);
@@ -228,15 +221,12 @@ private:
int64_t _version;
std::atomic<int> _min_cpu_percent = 0;
std::atomic<int> _max_cpu_percent = 100;
- std::atomic<int64_t> _memory_limit = 1 << 30; // Default to 1GB
+ std::atomic<int64_t> _memory_limit = 1 << 30; // Default to 1GB
+ std::atomic<int64_t> _min_memory_limit = 1 << 26; // Default to 64MB
std::atomic<int> _min_memory_percent = 0;
std::atomic<int> _max_memory_percent = 100;
std::atomic<int> _memory_low_watermark;
std::atomic<int> _memory_high_watermark;
- // For example, load memtable, write to parquet.
- // If the wg's memory reached high water mark, then the load buffer
- // will be restricted to this limit.
- int64_t _load_buffer_ratio = 0;
std::atomic<int> _scan_thread_num;
std::atomic<int> _max_remote_scan_thread_num;
@@ -246,9 +236,7 @@ private:
std::atomic<int> _total_query_slot_count = 0;
std::atomic<TWgSlotMemoryPolicy::type> _slot_mem_policy
{TWgSlotMemoryPolicy::NONE};
- std::atomic<bool> _enable_write_buffer_limit = false;
std::atomic_int64_t _total_mem_used = 0; // bytes
- std::atomic_int64_t _write_buffer_size = 0;
std::atomic_int64_t _wg_refresh_interval_memory_growth;
// means workload group is mark dropped
// new query can not submit
@@ -292,7 +280,6 @@ struct WorkloadGroupInfo {
const int64_t remote_read_bytes_per_second = -1;
const int total_query_slot_count = 0;
const TWgSlotMemoryPolicy::type slot_mem_policy =
TWgSlotMemoryPolicy::NONE;
- const int write_buffer_ratio = 0;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index e4dba22f668..5b2fd2efd6c 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -47,10 +47,9 @@ const static std::string INTERNAL_NORMAL_WG_NAME = "normal";
const static uint64_t INTERNAL_NORMAL_WG_ID = 1;
PausedQuery::PausedQuery(std::shared_ptr<ResourceContext> resource_ctx, double
cache_ratio,
- bool any_wg_exceed_limit, int64_t reserve_size)
+ int64_t reserve_size)
: resource_ctx_(resource_ctx),
cache_ratio_(cache_ratio),
- any_wg_exceed_limit_(any_wg_exceed_limit),
reserve_size_(reserve_size),
query_id_(print_id(resource_ctx->task_controller()->task_id())) {
enqueue_at = std::chrono::system_clock::now();
@@ -65,26 +64,8 @@ WorkloadGroupPtr
WorkloadGroupMgr::get_or_create_workload_group(
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
// 1. update internal wg's id
if (fe_wg_info.name == INTERNAL_NORMAL_WG_NAME) {
- WorkloadGroupPtr wg_ptr = nullptr;
- uint64_t old_wg_id = -1;
- auto before_wg_size = _workload_groups.size();
- for (auto& wg_pair : _workload_groups) {
- uint64_t wg_id = wg_pair.first;
- WorkloadGroupPtr wg = wg_pair.second;
- if (INTERNAL_NORMAL_WG_NAME == wg->name() && wg_id !=
fe_wg_info.id) {
- wg_ptr = wg_pair.second;
- old_wg_id = wg_id;
- break;
- }
- }
- if (wg_ptr) {
- _workload_groups.erase(old_wg_id);
- wg_ptr->set_id(fe_wg_info.id);
- _workload_groups[wg_ptr->id()] = wg_ptr;
- LOG(INFO) << "[topic_publish_wg] normal wg id changed, before: "
<< old_wg_id
- << ", after:" << wg_ptr->id() << ", wg size:" <<
before_wg_size << ", "
- << _workload_groups.size();
- }
+ // normal wg's id maybe not equal to BE's id, so that need update it
+ reset_workload_group_id(INTERNAL_NORMAL_WG_NAME, fe_wg_info.id);
}
// 2. check and update wg
@@ -94,7 +75,7 @@ WorkloadGroupPtr
WorkloadGroupMgr::get_or_create_workload_group(
return workload_group;
}
- auto new_task_group = std::make_shared<WorkloadGroup>(fe_wg_info);
+ auto new_task_group = WorkloadGroup::create_shared(fe_wg_info);
_workload_groups[fe_wg_info.id] = new_task_group;
return new_task_group;
}
@@ -144,6 +125,27 @@ WorkloadGroupPtr
WorkloadGroupMgr::get_group(std::vector<uint64_t>& id_list) {
return ret_wg;
}
+void WorkloadGroupMgr::reset_workload_group_id(std::string
workload_group_name, uint64_t new_id) {
+ WorkloadGroupPtr wg_ptr = nullptr;
+ uint64_t old_wg_id = -1;
+ for (auto& wg_pair : _workload_groups) {
+ uint64_t wg_id = wg_pair.first;
+ WorkloadGroupPtr wg = wg_pair.second;
+ if (workload_group_name == wg->name() && wg_id != new_id) {
+ wg_ptr = wg_pair.second;
+ old_wg_id = wg_id;
+ break;
+ }
+ }
+ if (wg_ptr) {
+ _workload_groups.erase(old_wg_id);
+ wg_ptr->set_id(new_id);
+ _workload_groups[wg_ptr->id()] = wg_ptr;
+ LOG(INFO) << "workload group's id changed, before: " << old_wg_id
+ << ", after:" << wg_ptr->id();
+ }
+}
+
void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t>
used_wg_id) {
int64_t begin_time = MonotonicMillis();
// 1 get delete group without running queries
@@ -220,58 +222,24 @@ struct WorkloadGroupMemInfo {
std::list<std::shared_ptr<MemTrackerLimiter>>();
};
-void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
+void WorkloadGroupMgr::refresh_workload_group_memory_state() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
// 1. make all workload groups memory snapshots(refresh workload groups
total memory used at the same time)
// and calculate total memory used of all queries.
int64_t all_workload_groups_mem_usage = 0;
- bool has_wg_exceed_limit = false;
for (auto& [wg_id, wg] : _workload_groups) {
all_workload_groups_mem_usage += wg->refresh_memory_usage();
- if (wg->exceed_limit()) {
- has_wg_exceed_limit = true;
- }
}
- doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit =
has_wg_exceed_limit;
if (all_workload_groups_mem_usage <= 0) {
return;
}
- // 2. calculate weighted memory limit ratio.
- // when construct workload group, mem_limit is equal to
(process_memory_limit * group_limit_percent),
- // here, it is assumed that the available memory of workload groups is
equal to process_memory_limit.
- //
- // but process_memory_usage is actually bigger than
all_workload_groups_mem_usage,
- // because public_memory of page cache, allocator cache, segment cache
etc. are included in process_memory_usage.
- // so actual available memory of the workload groups is equal to
(process_memory_limit - public_memory)
- //
- // we will exclude this public_memory when calculate workload group
mem_limit.
- // so a ratio is calculated to multiply the workload group mem_limit from
the previous construction.
- auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage();
- auto process_memory_limit = MemInfo::mem_limit();
- double weighted_memory_limit_ratio = 1;
- // if all_workload_groups_mem_usage is greater than process_memory_usage,
it means that the memory statistics
- // of the workload group are inaccurate.
- // the reason is that query/load/etc. tracked is virtual memory, and
virtual memory is not used in time.
- //
- // At this time, weighted_memory_limit_ratio is equal to 1, and workload
group mem_limit is still equal to
- // (process_memory_limit * group_limit_percent), this may cause query
spill to occur earlier,
- // However, there is no good solution at present, but we cannot predict
when these virtual memory will be used.
- if (all_workload_groups_mem_usage < process_memory_usage) {
- int64_t public_memory = process_memory_usage -
all_workload_groups_mem_usage;
- weighted_memory_limit_ratio = 1 - (double)public_memory /
(double)process_memory_limit;
- // Round the value from 1% to 100%.
- weighted_memory_limit_ratio = std::floor(weighted_memory_limit_ratio *
100) / 100;
- }
-
- std::string debug_msg = fmt::format(
- "\nProcess Memory Summary: {}, {}, all workload groups memory
usage: {}, "
- "weighted_memory_limit_ratio: {}",
- doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
- doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
- PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
- weighted_memory_limit_ratio);
+ std::string debug_msg =
+ fmt::format("\nProcess Memory Summary: {}, {}, all workload groups
memory usage: {}",
+
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
+
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
+ PrettyPrinter::print(all_workload_groups_mem_usage,
TUnit::BYTES));
LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
update_queries_limit_(wg.second, false);
@@ -300,7 +268,6 @@ void
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
4, wg->get_metrics()->get_local_scan_bytes_per_second(),
block);
SchemaScannerHelper::insert_int64_value(
5, wg->get_metrics()->get_remote_scan_bytes_per_second(),
block);
- SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(),
block);
}
}
@@ -322,7 +289,7 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<ResourceContext>&
auto&& [it, inserted] = _paused_queries_list[wg].emplace(
resource_ctx,
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
- doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit,
reserve_size);
+ reserve_size);
// Check if this is an invalid reserve, for example, if the reserve size
is too large, larger than the query limit
// if hard limit is enabled, then not need enable other queries hard limit.
if (inserted) {
@@ -351,7 +318,34 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
std::unique_lock<std::mutex> lock(_paused_queries_lock);
- bool has_revoked_from_other_group = false;
+ for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
+ auto& queries_list = it->second;
+ for (auto query_it = queries_list.begin(); query_it !=
queries_list.end();) {
+ auto resource_ctx = query_it->resource_ctx_.lock();
+ // The query is finished during in paused list.
+ if (resource_ctx == nullptr) {
+ LOG(INFO) << "Query: " << query_it->query_id() << " is
nullptr, erase it.";
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
+ // If there are any tasks that is cancelled and canceled time is
less than 15 seconds, just break.
+ // because it may release memory and other tasks may not be
cancelled and spill disk.
+ if (resource_ctx->task_controller()->is_cancelled() &&
+ resource_ctx->task_controller()->cancel_elapsed_millis() <
+ config::wait_cancel_release_memory_ms) {
+ return;
+ }
+ ++query_it;
+ }
+ if (queries_list.empty()) {
+ it = _paused_queries_list.erase(it);
+ continue;
+ } else {
+ // Finished deal with one workload group, and should deal with
next one.
+ ++it;
+ }
+ }
+
bool has_query_exceed_process_memlimit = false;
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
@@ -364,7 +358,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
bool has_changed_hard_limit = false;
- int64_t flushed_memtable_bytes = 0;
+ bool exceed_low_watermark = false;
+ bool exceed_high_watermark = false;
+ wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
// If the query is paused because its limit exceed the query itself's
memlimit, then just spill disk.
// The query's memlimit is set using slot mechanism and its value is
set using the user settings, not
// by weighted value. So if reserve failed, then it is actually exceed
limit.
@@ -376,12 +372,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
query_it = queries_list.erase(query_it);
continue;
}
- if (resource_ctx->task_controller()->is_cancelled()) {
- LOG(INFO) << "Query: " <<
print_id(resource_ctx->task_controller()->task_id())
- << " was canceled, remove from paused list";
- query_it = queries_list.erase(query_it);
- continue;
- }
if (resource_ctx->task_controller()
->paused_reason()
@@ -423,7 +413,10 @@ void WorkloadGroupMgr::handle_paused_queries() {
if (resource_ctx->memory_context()->adjusted_mem_limit() <
resource_ctx->memory_context()->current_memory_bytes() +
query_it->reserve_size_) {
-
resource_ctx->memory_context()->effect_adjusted_mem_limit();
+ // The query not exceed the query limit, but exceed the
expected query limit when the workload
+ // group memory is not enough, use the litter memory limit
to let the query exceed query limit.
+ resource_ctx->memory_context()->set_mem_limit(
+
resource_ctx->memory_context()->adjusted_mem_limit());
resource_ctx->task_controller()->set_memory_sufficient(true);
LOG(INFO) << "Workload group memory reserve failed because
"
<<
resource_ctx->task_controller()->debug_string() << " reserve size "
@@ -435,15 +428,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
query_it = queries_list.erase(query_it);
continue;
}
- if (flushed_memtable_bytes <= 0) {
- flushed_memtable_bytes = flush_memtable_from_group_(wg);
- }
- if (flushed_memtable_bytes > 0) {
- // Flushed some memtable, just wait flush finished and not
do anything more.
- wg->enable_write_buffer_limit(true);
- ++query_it;
- continue;
- }
// when running here, current query adjusted_mem_limit < query
memory consumption + reserve_size,
// which means that the current query itself has not exceeded
the memory limit.
@@ -498,11 +482,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
} else {
// Should not put the query back to task scheduler
immediately, because when wg's memory not sufficient,
// and then set wg's flag, other query may not free memory
very quickly.
- if (query_it->elapsed_time() >
config::spill_in_paused_queue_timeout_ms) {
+ // If the workload group's memusage is less than low
watermark then dispatch the query to run.
+ if (query_it->elapsed_time() >
config::spill_in_paused_queue_timeout_ms ||
+ !exceed_low_watermark) {
// set wg's memory to sufficient, then add it back to
task scheduler to run.
LOG(INFO) << "Query: "
<<
print_id(resource_ctx->task_controller()->task_id())
- << " will be resume.";
+ << " has waited in paused query queue for "
+ << query_it->elapsed_time() << " ms. Resume
it.";
resource_ctx->task_controller()->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
continue;
@@ -512,9 +499,21 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
} else {
+ if (revoking_memory_from_other_query_) {
+ // Previously, we have revoked memory from other query,
and the cancel stage finished.
+ // So, resume all queries now.
+
resource_ctx->task_controller()->set_memory_sufficient(true);
+ VLOG_DEBUG << "Query " <<
print_id(resource_ctx->task_controller()->task_id())
+ << " is blocked due to process memory not
enough, but already "
+ "cancelled some queries, resumt it now.";
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
has_query_exceed_process_memlimit = true;
// If wg's memlimit not exceed, but process memory exceed, it
means cache or other metadata
// used too much memory. Should clean all cache here.
+ // Clear all cache not part of cache, because the cache thread
already try to release cache step
+ // by step. And it is not useful.
//
// here query is paused because of PROCESS_MEMORY_EXCEEDED,
// normally, before process memory exceeds, daemon thread
`refresh_cache_capacity` will
@@ -541,68 +540,48 @@ void WorkloadGroupMgr::handle_paused_queries() {
// need to check config::disable_memory_gc here, if not, when
config::disable_memory_gc == true,
// cache is not adjusted, query_it->cache_ratio_ will always
be 1, and this if branch will nenver
// execute, this query will never be resumed, and will
deadlock here.
- if ((!config::disable_memory_gc && query_it->cache_ratio_ <
0.05) ||
- config::disable_memory_gc) {
- // 1. Check if could revoke some memory from memtable
- if (flushed_memtable_bytes <= 0) {
- // if the process memory has exceeded the limit, it is
expected that
- // `MemTableMemoryLimiter` will flush most of the
memtable.
- // but if the process memory is not exceeded, and the
current query expected reserve memory
- // to be too large, the other parts of the process
cannot perceive the reserve memory size,
- // so it is expected to flush memtable in
`handle_paused_queries`.
- flushed_memtable_bytes =
flush_memtable_from_group_(wg);
- }
- if (flushed_memtable_bytes > 0) {
- // Flushed some memtable, just wait flush finished and
not do anything more.
- wg->enable_write_buffer_limit(true);
- ++query_it;
- continue;
- }
- // TODO should wait here to check if the process has
release revoked_size memory and then continue.
- if (!has_revoked_from_other_group) {
- // `need_free_mem` is equal to the `reserve_size_` of
the first query
- // that `handle_paused_queries` reaches here this time.
- // this means that at least `reserve_size_` memory is
released from other wgs.
- // the released memory at least allows the current
query to execute,
- // but we will wake up all queries after this
`handle_paused_queries`,
- // even if the released memory is not enough for all
queries to execute,
- // but this can simplify the behavior and omit the
query priority.
- int64_t revoked_size =
revoke_memory_from_other_overcommited_groups_(
- resource_ctx, query_it->reserve_size_);
- if (revoked_size > 0) {
- has_revoked_from_other_group = true;
-
resource_ctx->task_controller()->set_memory_sufficient(true);
- VLOG_DEBUG << "Query: "
- <<
print_id(resource_ctx->task_controller()->task_id())
- << " is resumed after revoke memory
from other group.";
- query_it = queries_list.erase(query_it);
- // Do not care if the revoked_size > reserve size,
and try to run again.
- continue;
+ if (query_it->cache_ratio_ < 0.05 ||
config::disable_memory_gc) {
+ // If workload group's memory usage > min memory, then it
means the workload group use too much memory
+ // in memory contention state. Should just spill
+ if (wg->total_mem_used() > wg->min_memory_limit()) {
+ auto revocable_tasks =
+
resource_ctx->task_controller()->get_revocable_tasks();
+ if (revocable_tasks.empty()) {
+ Status status = Status::MemoryLimitExceeded(
+ "Workload group memory usage {} > min
memory {}, but no "
+ "revocable tasks",
+ wg->total_mem_used(),
wg->min_memory_limit());
+
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+
resource_ctx->task_controller()->task_id(), status);
+ revoking_memory_from_other_query_ = true;
+ // If any query is cancelled, then skip others
because it will release many memory and
+ // other query may not need release memory.
+ return;
} else {
- bool spill_res = handle_single_query_(
- resource_ctx, query_it->reserve_size_,
query_it->elapsed_time(),
-
resource_ctx->task_controller()->paused_reason());
- if (spill_res) {
- VLOG_DEBUG << "Query: "
- <<
print_id(resource_ctx->task_controller()->task_id())
- << " remove from paused list";
- query_it = queries_list.erase(query_it);
- continue;
- } else {
- ++query_it;
- continue;
+ SCOPED_ATTACH_TASK(resource_ctx);
+ auto status =
resource_ctx->task_controller()->revoke_memory();
+ if (!status.ok()) {
+
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+
resource_ctx->task_controller()->task_id(), status);
+ revoking_memory_from_other_query_ = true;
+ return;
}
+ query_it = queries_list.erase(query_it);
+ continue;
}
- } else {
- // If any query is cancelled during process limit
stage, should resume other query and
- // do not do any check now.
-
resource_ctx->task_controller()->set_memory_sufficient(true);
- VLOG_DEBUG
- << "Query: " <<
print_id(resource_ctx->task_controller()->task_id())
- << " remove from paused list";
- query_it = queries_list.erase(query_it);
- continue;
}
+
+ // Other workload groups many use a lot of memory, should
revoke memory from other workload groups
+ // by cancelling their queries.
+ int64_t revoked_size = revoke_memory_from_other_groups_();
+ if (revoked_size > 0) {
+ // Revoke memory from other workload groups will
cancel some queries, wait them cancel finished
+ // and then check it again.
+ revoking_memory_from_other_query_ = true;
+ return;
+ }
+
+ // TODO revoke from memtable
}
// `cache_ratio_ > 0.05` means that the cache has not been
cleared
// when the query enters the paused state.
@@ -625,15 +604,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
// even if wg has no query in the paused state, the following code
will still be executed
// because `handle_paused_queries` adds a <wg, empty set> to
`_paused_queries_list` at the beginning.
-
- bool is_low_watermark = false;
- bool is_high_watermark = false;
- wg->check_mem_used(&is_low_watermark, &is_high_watermark);
- // Not need waiting flush memtable and below low watermark disable
load buffer limit
- if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
- wg->enable_write_buffer_limit(false);
- }
-
if (queries_list.empty()) {
it = _paused_queries_list.erase(it);
continue;
@@ -642,6 +612,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
++it;
}
}
+ // Attention: has to be here. It means, no query is at cancelling state
and all query blocked by process
+ // not enough has been resumed.
+ revoking_memory_from_other_query_ = false;
if (!has_query_exceed_process_memlimit &&
doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted
< 0.05) {
@@ -656,126 +629,55 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
-// Return the expected free bytes if wg's memtable memory is greater than Max.
-int64_t WorkloadGroupMgr::flush_memtable_from_group_(WorkloadGroupPtr wg) {
- // If there are a lot of memtable memory, then wait them flush finished.
- MemTableMemoryLimiter* memtable_limiter =
- doris::ExecEnv::GetInstance()->memtable_memory_limiter();
- int64_t memtable_active_bytes = 0;
- int64_t memtable_queue_bytes = 0;
- int64_t memtable_flush_bytes = 0;
- DCHECK(memtable_limiter != nullptr) << "memtable limiter is nullptr";
- memtable_limiter->get_workload_group_memtable_usage(
- wg->id(), &memtable_active_bytes, &memtable_queue_bytes,
&memtable_flush_bytes);
- int64_t max_wg_memtable_bytes = wg->write_buffer_limit();
- if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
- max_wg_memtable_bytes) {
- auto max_wg_active_memtable_bytes =
(int64_t)(static_cast<double>(max_wg_memtable_bytes) *
-
config::load_max_wg_active_memtable_percent);
- // There are many table in flush queue, just waiting them flush
finished.
- if (memtable_active_bytes < max_wg_active_memtable_bytes) {
- LOG_EVERY_T(INFO, 60) << wg->name()
- << " load memtable size is: " <<
memtable_active_bytes << ", "
- << memtable_queue_bytes << ", " <<
memtable_flush_bytes
- << ", load buffer limit is: " <<
max_wg_memtable_bytes
- << " wait for flush finished to release more
memory";
- return memtable_queue_bytes + memtable_flush_bytes;
- } else {
- // Flush some memtables(currently written) to flush queue.
- memtable_limiter->flush_workload_group_memtables(
- wg->id(), memtable_active_bytes -
max_wg_active_memtable_bytes);
- LOG_EVERY_T(INFO, 60) << wg->name()
- << " load memtable size is: " <<
memtable_active_bytes << ", "
- << memtable_queue_bytes << ", " <<
memtable_flush_bytes
- << ", flush some active memtable to revoke
memory";
- return memtable_queue_bytes + memtable_flush_bytes +
memtable_active_bytes -
- max_wg_active_memtable_bytes;
- }
- }
- return 0;
-}
-
-// Revoke memory from workload group that exceed it's limit. For example, if
the wg's limit is 10g, but used 12g
-// then should revoke 2g from the group.
-int64_t WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_(
- std::shared_ptr<ResourceContext> requestor, int64_t need_free_mem) {
- int64_t freed_mem = 0;
+// Find the workload group that could revoke lot of memory:
+// 1. workload group = max(total used memory - min memory that should reserved
for it)
+// 2. revoke 10% memory of the workload group that exceeded. For example, if
the workload group exceed 10g,
+// then revoke 1g memory.
+// 3. After revoke memory, go to the loop and wait for the query to be
cancelled and check again.
+int64_t WorkloadGroupMgr::revoke_memory_from_other_groups_() {
MonotonicStopWatch watch;
watch.start();
std::unique_ptr<RuntimeProfile> profile =
-
std::make_unique<RuntimeProfile>("RevokeMemoryFromOtherOvercommitedGroups");
+ std::make_unique<RuntimeProfile>("RevokeMemoryFromOtherGroups");
- using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
- auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
- return left.second < right.second;
- };
- std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>,
decltype(cmp)>
- exceeded_memory_heap(cmp);
- int64_t total_exceeded_memory = 0;
+ WorkloadGroupPtr max_wg = nullptr;
+ int64_t max_exceeded_memory = 0;
{
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (auto& workload_group : _workload_groups) {
- // TODO should use min memory percent to check
- if (!workload_group.second->exceed_limit()) {
+ int64_t min_memory_limit =
workload_group.second->min_memory_limit();
+ int64_t total_used_memory =
workload_group.second->total_mem_used();
+ if (total_used_memory <= min_memory_limit) {
+ // min memory is reserved for this workload group, if it used
less than min memory,
+ // then not revoke memory from it.
continue;
}
- if (requestor->workload_group() != nullptr &&
- workload_group.second->id() ==
requestor->workload_group()->id()) {
- continue;
+ if (total_used_memory - min_memory_limit > max_exceeded_memory) {
+ max_wg = workload_group.second;
+ max_exceeded_memory = total_used_memory - min_memory_limit;
}
- auto exceeded_memory =
- workload_group.second->memory_used() -
workload_group.second->memory_limit();
- exceeded_memory_heap.emplace(workload_group.second,
exceeded_memory);
- total_exceeded_memory += exceeded_memory;
}
}
-
- auto revoke_reason = fmt::format(
- "{} try reserve {} bytes failed, revoke memory from other
overcommited groups",
- requestor->memory_context()->mem_tracker()->label(),
need_free_mem);
+ if (max_wg == nullptr) {
+ return 0;
+ }
+ if (max_exceeded_memory < 1 << 27) {
+ LOG(INFO) << "The workload group that exceed most memory is :"
+ << max_wg->memory_debug_string() << ", max_exceeded_memory: "
+ << PrettyPrinter::print(max_exceeded_memory, TUnit::BYTES)
+ << " less than 128MB, no need to revoke memory";
+ return 0;
+ }
+ int64_t freed_mem = static_cast<int64_t>((double)max_exceeded_memory *
0.1);
+ // Revoke 10% of memory from the workload group that exceed most memory
+ max_wg->revoke_memory(freed_mem, "exceed_memory", profile.get());
+ std::stringstream ss;
+ profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
- "[MemoryGC] start
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, {}, "
- "number of overcommited groups: {}, total exceeded memory: {}.",
- revoke_reason, exceeded_memory_heap.size(),
- PrettyPrinter::print_bytes(total_exceeded_memory));
- Defer defer {[&]() {
- std::stringstream ss;
- profile->pretty_print(&ss);
- LOG(INFO) << fmt::format(
- "[MemoryGC] end
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, "
- "{}, number of overcommited groups: {}, free memory {}.
cost(us): {}, details: {}",
- revoke_reason, exceeded_memory_heap.size(),
PrettyPrinter::print_bytes(freed_mem),
- watch.elapsed_time() / 1000, ss.str());
- }};
-
- // 1. check memtable usage, and try to flush them and not wait for
finished.
- // TODO, there are two problems with flushing the memtable of other
overcommited groups:
- // 1. When should enable_write_buffer_limit be set back to false?
- // 2. Flushing the memtable may be slow, current query may have to
wait for a long time.
- // auto heap_copy = heap;
- // while (!heap_copy.empty() && need_free_mem - freed_mem > 0 &&
- // !requestor->task_controller()->is_cancelled()) {
- // auto [wg, sort_mem] = heap_copy.top();
- // heap_copy.pop();
- // if (wg->exceed_limit() && !wg->enable_write_buffer_limit()) { // is
overcommited
- // int64_t flushed_memtable_bytes = flush_memtable_from_group_(wg);
- // if (flushed_memtable_bytes > 0) {
- // wg->enable_write_buffer_limit(true);
- // }
- // freed_mem += flushed_memtable_bytes;
- // }
- // }
-
- // 2. cancel top usage query in other overcommit group, one by one.
- // Sort all memory limiter in all overcommit wg, and cancel the top usage
task that with most memory.
- // Maybe not valid because it's memory not exceed limit.
- while (!exceeded_memory_heap.empty() && need_free_mem - freed_mem > 0 &&
- !requestor->task_controller()->is_cancelled()) {
- auto [wg, exceeded_memory] = exceeded_memory_heap.top();
- exceeded_memory_heap.pop();
- freed_mem += wg->revoke_memory(std::min(exceeded_memory, need_free_mem
- freed_mem),
- revoke_reason, profile.get());
- }
+ "[MemoryGC] process memory not enough, revoke memory from
workload_group: {}, "
+ "free memory {}. cost(us): {}, details: {}",
+ max_wg->memory_debug_string(),
PrettyPrinter::print_bytes(freed_mem),
+ watch.elapsed_time() / 1000, ss.str());
return freed_mem;
}
@@ -836,6 +738,10 @@ bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<ResourceContex
->memory_profile()
->process_memory_detail_str());
LOG_LONG_STRING(INFO, log_str);
+ // Disable reserve memory will enable query level memory
check, if the query
+ // need a lot of memory than the memory limit, it will be
killed.
+ // Do not need set memlimit = ajusted_mem_limit because
workload group refresher thread
+ // will update automatically.
requestor->task_controller()->disable_reserve_memory();
requestor->task_controller()->set_memory_sufficient(true);
return true;
@@ -923,40 +829,23 @@ bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<ResourceContex
void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool
enable_hard_limit) {
auto wg_mem_limit = wg->memory_limit();
auto all_resource_ctxs = wg->resource_ctxs();
- bool is_low_watermark = false;
- bool is_high_watermark = false;
- wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+ bool exceed_low_watermark = false;
+ bool exceed_high_watermark = false;
+ wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
int64_t wg_high_water_mark_limit =
(int64_t)(static_cast<double>(wg_mem_limit) *
wg->memory_high_watermark() * 1.0 / 100);
- int64_t memtable_usage = wg->write_buffer_size();
int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
- if (memtable_usage > wg->write_buffer_limit()) {
- wg_high_water_mark_except_load = wg_high_water_mark_limit -
wg->write_buffer_limit();
- } else {
- wg_high_water_mark_except_load =
- wg_high_water_mark_limit - memtable_usage - 10 * 1024 * 1024;
- }
std::string debug_msg;
- if (is_high_watermark || is_low_watermark) {
+ if (exceed_high_watermark || exceed_low_watermark) {
debug_msg = fmt::format(
"\nWorkload Group {}: mem limit: {}, mem used: {}, "
- "high water mark mem limit: {}, load memtable usage: {}, used
ratio: {}",
+ "high water mark mem limit: {}, used ratio: {}",
wg->name(), PrettyPrinter::print(wg->memory_limit(),
TUnit::BYTES),
PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES),
PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES),
- PrettyPrinter::print(memtable_usage, TUnit::BYTES),
(double)(wg->total_mem_used()) /
static_cast<double>(wg_mem_limit));
}
- // If reached low watermark, then enable load buffer limit
- if (is_low_watermark) {
- wg->enable_write_buffer_limit(true);
- }
- // Both enable overcommit and not enable overcommit, if user set slot
memory policy
- // then we will replace the memtracker's memlimit with
- if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
- return;
- }
int32_t total_used_slot_count = 0;
int32_t total_slot_count = wg->total_query_slot_count();
// calculate total used slot count
@@ -977,13 +866,20 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
if (!resource_ctx) {
continue;
}
- if (is_low_watermark) {
+ if (exceed_low_watermark) {
resource_ctx->task_controller()->set_low_memory_mode(true);
}
int64_t query_weighted_mem_limit = 0;
int64_t expected_query_weighted_mem_limit = 0;
- // If the query enable hard limit, then it should not use the soft
limit
- if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) {
+ if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+ query_weighted_mem_limit =
resource_ctx->memory_context()->user_set_mem_limit();
+ // If the policy is NONE, we use the query's memory limit. but the
query's memory limit
+ // should not be greater than the workload group's memory limit.
+ if (query_weighted_mem_limit > wg_mem_limit) {
+ query_weighted_mem_limit = wg_mem_limit;
+ }
+ expected_query_weighted_mem_limit = query_weighted_mem_limit;
+ } else if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) {
// TODO, `Policy::FIXED` expects `all_query_used_slot_count <
wg_total_slot_count`,
// which is controlled when query is submitted
// DCEHCK(total_used_slot_count <= total_slot_count);
@@ -991,6 +887,7 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
LOG(WARNING)
<< "Query " <<
print_id(resource_ctx->task_controller()->task_id())
<< " enabled hard limit, but the slot count < 1, could
not take affect";
+ continue;
} else {
// If the query enable hard limit, then not use weighted info
any more, just use the settings limit.
query_weighted_mem_limit =
@@ -999,7 +896,7 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
total_slot_count);
expected_query_weighted_mem_limit = query_weighted_mem_limit;
}
- } else {
+ } else if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DYNAMIC) {
// If low water mark is not reached, then use process memory limit
as query memory limit.
// It means it will not take effect.
// If there are some query in paused list, then limit should take
effect.
@@ -1011,7 +908,7 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
resource_ctx->task_controller()->get_slot_count() * 1.0 /
total_used_slot_count)
: wg_high_water_mark_except_load;
- if (!is_low_watermark && !enable_hard_limit) {
+ if (!exceed_low_watermark && !enable_hard_limit) {
query_weighted_mem_limit = wg_high_water_mark_except_load;
} else {
query_weighted_mem_limit = expected_query_weighted_mem_limit;
@@ -1021,7 +918,11 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
// If the query is a pure load task, then should not modify its limit.
Or it will reserve
// memory failed and we did not hanle it.
if (!resource_ctx->task_controller()->is_pure_load_task()) {
-
resource_ctx->memory_context()->set_mem_limit(query_weighted_mem_limit);
+ // If user's set mem limit is less than query weighted mem limit,
then should not modify its limit.
+ // Use user settings.
+ if (resource_ctx->memory_context()->user_set_mem_limit() >
query_weighted_mem_limit) {
+
resource_ctx->memory_context()->set_mem_limit(query_weighted_mem_limit);
+ }
resource_ctx->memory_context()->set_adjusted_mem_limit(
expected_query_weighted_mem_limit);
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 87e7943d436..20f8a926148 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -46,11 +46,10 @@ public:
std::chrono::system_clock::time_point enqueue_at;
size_t last_mem_usage {0};
double cache_ratio_ {0.0};
- bool any_wg_exceed_limit_ {false};
int64_t reserve_size_ {0};
PausedQuery(std::shared_ptr<ResourceContext> resource_ctx_, double
cache_ratio,
- bool any_wg_exceed_limit, int64_t reserve_size);
+ int64_t reserve_size);
int64_t elapsed_time() const {
auto now = std::chrono::system_clock::now();
@@ -76,11 +75,15 @@ public:
WorkloadGroupPtr get_group(std::vector<uint64_t>& id_list);
+ // This method is used during workload group listener to update internal
workload group's id.
+ // This method does not acquire locks, so it should be called in a locked
context.
+ void reset_workload_group_id(std::string workload_group_name, uint64_t
new_id);
+
void do_sweep();
void stop();
- void refresh_wg_weighted_memory_limit();
+ void refresh_workload_group_memory_state();
void get_wg_resource_usage(vectorized::Block* block);
@@ -99,11 +102,9 @@ private:
WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo&
workload_group_info);
- int64_t flush_memtable_from_group_(WorkloadGroupPtr wg);
bool handle_single_query_(const std::shared_ptr<ResourceContext>&
requestor,
size_t size_to_reserve, int64_t time_in_queue,
Status paused_reason);
- int64_t revoke_memory_from_other_overcommited_groups_(
- std::shared_ptr<ResourceContext> requestor, int64_t need_free_mem);
+ int64_t revoke_memory_from_other_groups_();
void update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit);
std::shared_mutex _group_mutex;
@@ -115,6 +116,9 @@ private:
// workload group, because we need do some coordinate work globally.
std::mutex _paused_queries_lock;
std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list;
+ // If any query is cancelled when process memory is not enough, we set
this to true.
+ // When there is not query in cancel state, this var is set to false.
+ bool revoking_memory_from_other_query_ = false;
};
} // namespace doris
diff --git a/be/src/runtime/workload_management/memory_context.h
b/be/src/runtime/workload_management/memory_context.h
index 0e04a624ccc..027f2fcb61d 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -89,13 +89,14 @@ public:
void set_mem_limit(int64_t new_mem_limit) const {
mem_tracker_->set_limit(new_mem_limit); }
int64_t mem_limit() const { return mem_tracker_->limit(); }
+ int64_t user_set_mem_limit() const { return user_set_mem_limit_; }
+
// The new memlimit should be less than user set memlimit.
void set_adjusted_mem_limit(int64_t new_mem_limit) {
adjusted_mem_limit_ = std::min<int64_t>(new_mem_limit,
user_set_mem_limit_);
}
// Expected mem limit is the limit when workload group reached limit.
int64_t adjusted_mem_limit() { return adjusted_mem_limit_; }
- void effect_adjusted_mem_limit() { set_mem_limit(adjusted_mem_limit_); }
int64_t current_memory_bytes() const { return mem_tracker_->consumption();
}
int64_t peak_memory_bytes() const { return
mem_tracker_->peak_consumption(); }
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp
b/be/src/runtime/workload_management/query_task_controller.cpp
index 6f7ac130d3d..43ef520b794 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -24,8 +24,9 @@
namespace doris {
#include "common/compile_check_begin.h"
-std::unique_ptr<TaskController> QueryTaskController::create(QueryContext*
query_ctx) {
- return QueryTaskController::create_unique(query_ctx->shared_from_this());
+std::unique_ptr<TaskController> QueryTaskController::create(
+ std::shared_ptr<QueryContext> query_ctx) {
+ return QueryTaskController::create_unique(query_ctx);
}
bool QueryTaskController::is_cancelled() const {
diff --git a/be/src/runtime/workload_management/query_task_controller.h
b/be/src/runtime/workload_management/query_task_controller.h
index 6a1d11076c4..64681177f92 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -29,7 +29,7 @@ class QueryTaskController : public TaskController {
ENABLE_FACTORY_CREATOR(QueryTaskController);
public:
- static std::unique_ptr<TaskController> create(QueryContext* query_ctx);
+ static std::unique_ptr<TaskController>
create(std::shared_ptr<QueryContext> query_ctx);
~QueryTaskController() override = default;
bool is_cancelled() const override;
diff --git a/be/src/runtime/workload_management/task_controller.h
b/be/src/runtime/workload_management/task_controller.h
index 0ecf1d28541..de3a3db6fef 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -77,6 +77,8 @@ public:
return cancel_impl(reason);
}
+ int64_t cancel_elapsed_millis() const { return MonotonicMillis() -
cancelled_time_; }
+
virtual bool cancel_impl(const Status& reason) { return false; }
int64_t cancelled_time() const { return cancelled_time_; }
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index bbaee48ceca..ae2a6ab2a93 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -564,8 +564,8 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
}
{
SCOPED_TIMER(_wait_mem_limit_timer);
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
- _state->workload_group(), [state = _state]() { return
state->is_cancelled(); });
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+ [state = _state]() { return state->is_cancelled(); });
if (_state->is_cancelled()) {
return _state->cancel_reason();
}
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
index e372f5443ce..bf7b0d5834b 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -169,7 +169,7 @@ TEST_F(MemTableMemoryLimiterTest,
handle_memtable_flush_test) {
ASSERT_TRUE(res.ok());
}
static_cast<void>(mem_limiter->init(100));
- mem_limiter->_handle_memtable_flush(nullptr, nullptr);
+ mem_limiter->handle_memtable_flush(nullptr);
CHECK_EQ(0, mem_limiter->mem_usage());
res = delta_writer->close();
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index 8b5e3a8796f..d3c589552f2 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -125,6 +125,8 @@ TEST_F(WorkloadGroupManagerTest,
get_or_create_workload_group) {
ASSERT_EQ(wg->id(), 0);
}
+// Query is paused due to query memlimit exceed, after waiting in queue for
spill_in_paused_queue_timeout_ms
+// it should be resumed
TEST_F(WorkloadGroupManagerTest, query_exceed) {
auto wg = _wg_manager->get_or_create_workload_group({});
auto query_context = _generate_on_query(wg);
@@ -132,19 +134,21 @@ TEST_F(WorkloadGroupManagerTest, query_exceed) {
query_context->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024);
query_context->query_mem_tracker()->consume(1024 * 4);
+ std::cout << config::spill_in_paused_queue_timeout_ms << std::endl;
+
_wg_manager->add_paused_query(query_context->resource_ctx(), 1024L * 1024
* 1024,
Status::Error(ErrorCode::QUERY_MEMORY_EXCEEDED, "test"));
{
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
- << "pasued queue should not be empty";
+ << "paused queue should not be empty";
}
query_context->query_mem_tracker()->consume(-1024 * 4);
_run_checking_loop(wg);
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
- ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued
queue should be empty";
+ ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "paused
queue should be empty";
ASSERT_EQ(query_context->is_cancelled(), false) << "query should be not
canceled";
ASSERT_EQ(query_context->resource_ctx()->task_controller()->is_enable_reserve_memory(),
false)
<< "query should disable reserve memory";
@@ -172,7 +176,7 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed1) {
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued
queue should be empty";
- ASSERT_EQ(query_context->is_cancelled(), false) << "query should be
canceled";
+ ASSERT_EQ(query_context->is_cancelled(), false) << "query should not be
canceled";
}
// TWgSlotMemoryPolicy::NONE
@@ -201,6 +205,8 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed2) {
// TWgSlotMemoryPolicy::NONE
// query_ctx->workload_group()->exceed_limit() == true
+// query limit > workload group limit
+// query's limit will be set to workload group limit
TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
WorkloadGroupInfo wg_info {
.id = 1, .memory_limit = 1024L * 1024, .slot_mem_policy =
TWgSlotMemoryPolicy::NONE};
@@ -209,12 +215,15 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
query_context->query_mem_tracker()->consume(1024L * 1024 * 4);
+ // adjust memlimit is larger than mem limit
+
query_context->resource_ctx()->memory_context()->set_adjusted_mem_limit(1024L *
1024 * 10);
+
_wg_manager->add_paused_query(query_context->resource_ctx(), 1024L,
Status::Error(ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED, "test"));
{
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
- << "pasued queue should not be empty";
+ << "paused queue should not be empty";
}
wg->refresh_memory_usage();
@@ -224,10 +233,19 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
// Query was not cancelled, because the query's limit is bigger than the
wg's limit and the wg's policy is NONE.
ASSERT_FALSE(query_context->is_cancelled());
- ASSERT_GT(query_context->resource_ctx()->memory_context()->mem_limit(),
wg->memory_limit());
+ // Its limit == workload group's limit
+ ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(),
wg->memory_limit());
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
- ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued
queue should be empty";
+ ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty())
+ << "paused queue should be empty, because the query will be
resumed";
+ // Query's memory usage + reserve size > adjusted memory size it will be
resumed
+ // it's memlimit will be set to adjusted size.
+
ASSERT_EQ(query_context->resource_ctx()->task_controller()->is_enable_reserve_memory(),
true)
+ << "query should disable reserve memory";
+ // adjust memlimit is larger than workload group memlimit, so adjust
memlimit is reset to workload group mem limit.
+
ASSERT_EQ(query_context->resource_ctx()->memory_context()->adjusted_mem_limit(),
+ wg->memory_limit());
}
// TWgSlotMemoryPolicy::FIXED
@@ -251,7 +269,7 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed4) {
<< "pasued queue should not be empty";
}
- _wg_manager->refresh_wg_weighted_memory_limit();
+ _wg_manager->refresh_workload_group_memory_state();
LOG(INFO) << "***** wg usage " << wg->refresh_memory_usage();
_run_checking_loop(wg);
@@ -260,7 +278,7 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed4) {
LOG(INFO) << "***** query_context->get_mem_limit(): "
<< query_context->resource_ctx()->memory_context()->mem_limit();
const auto delta =
std::abs(query_context->resource_ctx()->memory_context()->mem_limit() -
- ((1024L * 1024 * 100 * 95) / 100 - 10 * 1024 *
1024) / 5);
+ (1024L * 1024 * 100 * 95) / 100 / 5);
ASSERT_LE(delta, 1);
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
@@ -271,6 +289,8 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed4) {
TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
WorkloadGroupInfo wg_info {.id = 1,
.memory_limit = 1024L * 1024 * 100,
+ .min_memory_percent = 10,
+ .max_memory_percent = 100,
.memory_low_watermark = 80,
.memory_high_watermark = 95,
.total_query_slot_count = 5,
@@ -285,10 +305,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
{
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
- << "pasued queue should not be empty";
+ << "paused queue should not be empty";
}
- _wg_manager->refresh_wg_weighted_memory_limit();
+ _wg_manager->refresh_workload_group_memory_state();
LOG(INFO) << "***** wg usage " << wg->refresh_memory_usage();
_run_checking_loop(wg);
@@ -296,8 +316,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
ASSERT_TRUE(query_context->resource_ctx()->task_controller()->paused_reason().ok());
LOG(INFO) << "***** query_context->get_mem_limit(): "
<< query_context->resource_ctx()->memory_context()->mem_limit();
+
+ // + slot count, because in query memlimit it + slot count
ASSERT_LE(query_context->resource_ctx()->memory_context()->mem_limit(),
- (1024L * 1024 * 100 * 95) / 100);
+ ((1024L * 1024 * 100 * 95) / 100 + 5));
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued
queue should be empty";
@@ -383,90 +405,119 @@ TEST_F(WorkloadGroupManagerTest, query_released) {
ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued
queue should be empty";
}
-TEST_F(WorkloadGroupManagerTest, overcommit1) {
- WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 100};
- WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 100};
- WorkloadGroupInfo wg3_info {.id = 3, .memory_limit = 1024L * 1024 * 100};
- WorkloadGroupInfo wg4_info {.id = 4, .memory_limit = 1024L * 1024 * 1024 *
10};
- WorkloadGroupInfo wg5_info {.id = 5, .memory_limit = 1024L * 1024 * 1024 *
100};
+TEST_F(WorkloadGroupManagerTest, ProcessMemoryNotEnough) {
+ WorkloadGroupInfo wg1_info {.id = 1,
+ .memory_limit = 1024L * 1024 * 1000,
+ .min_memory_percent = 10,
+ .max_memory_percent = 100};
+ WorkloadGroupInfo wg2_info {.id = 2,
+ .memory_limit = 1024L * 1024 * 1000,
+ .min_memory_percent = 10,
+ .max_memory_percent = 100};
+ WorkloadGroupInfo wg3_info {.id = 3,
+ .memory_limit = 1024L * 1024 * 1000,
+ .min_memory_percent = 10,
+ .max_memory_percent = 100};
+
auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info);
auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);
auto wg3 = _wg_manager->get_or_create_workload_group(wg3_info);
- auto wg4 = _wg_manager->get_or_create_workload_group(wg4_info);
- auto wg5 = _wg_manager->get_or_create_workload_group(wg5_info);
+
EXPECT_EQ(wg1->id(), wg1_info.id);
EXPECT_EQ(wg2->id(), wg2_info.id);
EXPECT_EQ(wg3->id(), wg3_info.id);
- EXPECT_EQ(wg5->id(), wg5_info.id);
+
+ EXPECT_EQ(1024L * 1024 * 100, wg1->min_memory_limit());
auto query_context11 = _generate_on_query(wg1);
+ query_context11->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
+ query_context11->query_mem_tracker()->consume(1024 * 1024 * 10);
+
+ wg1->refresh_memory_usage();
+ wg2->refresh_memory_usage();
+ wg3->refresh_memory_usage();
+
+ // There is no query in workload groups, so that revoke memory will return 0
+ EXPECT_EQ(0, _wg_manager->revoke_memory_from_other_groups_());
- // wg2 is overcommited, some query is overcommited
+ // If exceed memory less than 128MB, then not revoke
auto query_context21 = _generate_on_query(wg2);
+ query_context21->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
+ query_context21->query_mem_tracker()->consume(1024 * 1024 * 50);
+ wg2->refresh_memory_usage();
+ EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 50);
+ EXPECT_EQ(wg2->min_memory_limit(), 1024 * 1024 * 100);
+ // There is not workload group's memory usage > it's min memory limit.
+ EXPECT_EQ(0, _wg_manager->revoke_memory_from_other_groups_());
+ ASSERT_FALSE(query_context21->is_cancelled());
+
+ // Add another query that use a lot of memory
auto query_context22 = _generate_on_query(wg2);
+ query_context22->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
+ query_context22->query_mem_tracker()->consume(1024 * 1024 * 60);
+ wg2->refresh_memory_usage();
+ EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 110);
+ EXPECT_EQ(wg2->min_memory_limit(), 1024 * 1024 * 100);
+ // Could not revoke larger than 128MB, not revoke.
+ EXPECT_EQ(0, _wg_manager->revoke_memory_from_other_groups_());
+ ASSERT_FALSE(query_context21->is_cancelled());
+ ASSERT_FALSE(query_context22->is_cancelled());
+
+ // Add another query that use a lot of memory
auto query_context23 = _generate_on_query(wg2);
+ query_context23->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
+ query_context23->query_mem_tracker()->consume(1024 * 1024 * 300);
+ wg2->refresh_memory_usage();
+ EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 410);
+ EXPECT_EQ(wg2->min_memory_limit(), 1024 * 1024 * 100);
+ EXPECT_EQ(31 * 1024 * 1024,
_wg_manager->revoke_memory_from_other_groups_());
+ ASSERT_FALSE(query_context21->is_cancelled());
+ ASSERT_FALSE(query_context22->is_cancelled());
+ ASSERT_TRUE(query_context23->is_cancelled());
+ // Although query23 is cancelled, but it is not removed from workload
group2, so that it still occupy memory usage.
+ wg2->refresh_memory_usage();
+ EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 410);
+ // clear cancelled query from workload group.
+ wg2->clear_cancelled_resource_ctx();
+ wg2->refresh_memory_usage();
+ EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 110);
+ // todo 应该是cancel 最大的query
+
auto query_context24 = _generate_on_query(wg2);
- auto query_context25 = _generate_on_query(wg2);
- auto query_context26 = _generate_on_query(wg2);
- query_context21->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024);
- query_context21->query_mem_tracker()->consume(1024 * 1024 * 1024);
- query_context22->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
- query_context22->query_mem_tracker()->consume(1024 * 1024 * 64);
- query_context23->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024);
- query_context23->query_mem_tracker()->consume(1024 * 1024 * 10);
- query_context24->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024);
- query_context24->query_mem_tracker()->consume(1024);
- query_context25->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 512);
- query_context25->query_mem_tracker()->consume(1024 * 1024 * 1024);
- query_context26->resource_ctx()->memory_context()->set_mem_limit(1024L *
1024 * 1024 * 100);
- query_context26->query_mem_tracker()->consume(1024 * 1024 * 1024);
+ query_context24->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
+ query_context24->query_mem_tracker()->consume(1024 * 1024 * 300);
+ wg2->refresh_memory_usage();
+ EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 410); // WG2 exceed 310MB
// wg3 is overcommited, some query is overcommited
auto query_context31 = _generate_on_query(wg3);
- auto query_context32 = _generate_on_query(wg3);
- auto query_context33 = _generate_on_query(wg3);
- query_context31->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024);
- query_context31->query_mem_tracker()->consume(1024 * 1024 * 1024);
- query_context32->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
- query_context32->query_mem_tracker()->consume(1024 * 1024 * 512);
- query_context33->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 512);
- query_context33->query_mem_tracker()->consume(1024 * 1024 * 1024);
-
- // wg4 not overcommited, query is overcommited
- auto query_context41 = _generate_on_query(wg4);
- query_context41->resource_ctx()->memory_context()->set_mem_limit(1024L *
1024);
- query_context41->query_mem_tracker()->consume(1024L * 1024 * 1024 * 9);
-
- // wg5 disable overcommited, query is overcommited
- auto query_context51 = _generate_on_query(wg5);
- query_context51->resource_ctx()->memory_context()->set_mem_limit(1024L *
1024);
- query_context51->query_mem_tracker()->consume(1024L * 1024 * 1024 * 99);
+ query_context31->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024 * 1024);
+ query_context31->query_mem_tracker()->consume(1024 * 1024 * 500);
+ wg3->refresh_memory_usage();
+ EXPECT_EQ(wg3->total_mem_used(), 1024 * 1024 * 500); // WG3 exceed 400MB
+
+ EXPECT_EQ(40 * 1024 * 1024,
_wg_manager->revoke_memory_from_other_groups_());
wg1->refresh_memory_usage();
wg2->refresh_memory_usage();
wg3->refresh_memory_usage();
- wg4->refresh_memory_usage();
- wg5->refresh_memory_usage();
-
- // step1, wg2 is overcommited largest, cancel some overcommited query,
freed memory less than need_free_mem.
- // step2, wg3 is less overcommited than wg2, cancel some overcommited
query, terminate after freed memory is greater than need_free_mem.
- // query41 in wg4 has largest overcommited, but wg4 not overcommited, so
not cancel query41.
- EXPECT_EQ(_wg_manager->revoke_memory_from_other_overcommited_groups_(
- query_context11->resource_ctx(), 1024L * 1024 * 1024 * 3
+ 1),
- 1024L * 1024 * 1024 * 4);
+ ASSERT_TRUE(query_context31->is_cancelled());
+ // query31 is still in wg3, so that it is not cancel again.
+ EXPECT_EQ(40 * 1024 * 1024,
_wg_manager->revoke_memory_from_other_groups_());
ASSERT_FALSE(query_context11->is_cancelled());
- ASSERT_TRUE(query_context21->is_cancelled());
+ ASSERT_FALSE(query_context21->is_cancelled());
ASSERT_FALSE(query_context22->is_cancelled());
- ASSERT_FALSE(query_context23->is_cancelled());
ASSERT_FALSE(query_context24->is_cancelled());
- ASSERT_TRUE(query_context25->is_cancelled());
- ASSERT_TRUE(query_context26->is_cancelled());
ASSERT_TRUE(query_context31->is_cancelled());
- ASSERT_FALSE(query_context32->is_cancelled());
- ASSERT_FALSE(query_context33->is_cancelled());
- ASSERT_FALSE(query_context41->is_cancelled());
- ASSERT_FALSE(query_context51->is_cancelled());
+
+ // remove query31 from wg
+ wg3->clear_cancelled_resource_ctx();
+
+ wg1->refresh_memory_usage();
+ wg2->refresh_memory_usage();
+ wg3->refresh_memory_usage();
+ EXPECT_EQ(wg3->total_mem_used(), 0); // WG3 exceed 400MB
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 57a80e18c85..b6f2ef2fbd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -587,7 +587,6 @@ public class SchemaTable extends Table {
.column("CPU_USAGE_PERCENT",
ScalarType.createType(PrimitiveType.DOUBLE))
.column("LOCAL_SCAN_BYTES_PER_SECOND",
ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_SCAN_BYTES_PER_SECOND",
ScalarType.createType(PrimitiveType.BIGINT))
- .column("WRITE_BUFFER_USAGE_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
.build())
)
.put("file_cache_statistics",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index f21f6c2dfce..f8e756f1fa2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -68,8 +68,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
public static final String ENABLE_MEMORY_OVERCOMMIT =
"enable_memory_overcommit";
- public static final String WRITE_BUFFER_RATIO = "write_buffer_ratio";
-
public static final String MAX_CONCURRENCY = "max_concurrency";
public static final String MAX_QUEUE_SIZE = "max_queue_size";
@@ -107,7 +105,7 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
.add(MAX_MEMORY_PERCENT).add(MIN_MEMORY_PERCENT)
.add(MAX_CONCURRENCY).add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT)
.add(SCAN_THREAD_NUM).add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
-
.add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK).add(WRITE_BUFFER_RATIO)
+ .add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK)
.add(COMPUTE_GROUP).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
.add(SLOT_MEMORY_POLICY).build();
@@ -135,7 +133,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND,
"-1");
}
- public static final int WRITE_BUFFER_RATIO_DEFAULT_VALUE = 20;
public static final String SLOT_MEMORY_POLICY_DEFAULT_VALUE = "none";
public static final HashSet<String> AVAILABLE_SLOT_MEMORY_POLICY_VALUES =
new HashSet<String>() {{
add("none");
@@ -168,16 +165,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
this.properties = properties;
this.version = version;
- if (properties.containsKey(WRITE_BUFFER_RATIO)) {
- String loadBufLimitStr = properties.get(WRITE_BUFFER_RATIO);
- if (loadBufLimitStr.endsWith("%")) {
- loadBufLimitStr = loadBufLimitStr.substring(0,
loadBufLimitStr.length() - 1);
- }
- this.properties.put(WRITE_BUFFER_RATIO, loadBufLimitStr);
- } else {
- this.properties.put(WRITE_BUFFER_RATIO,
WRITE_BUFFER_RATIO_DEFAULT_VALUE + "");
- }
-
if (properties.containsKey(SLOT_MEMORY_POLICY)) {
String slotPolicy = properties.get(SLOT_MEMORY_POLICY);
this.properties.put(SLOT_MEMORY_POLICY, slotPolicy);
@@ -378,25 +365,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
+ MIN_MEMORY_PERCENT + " " + minMemPercent);
}
- if (properties.containsKey(WRITE_BUFFER_RATIO)) {
- String writeBufSizeStr = properties.get(WRITE_BUFFER_RATIO);
- String memLimitErr = WRITE_BUFFER_RATIO + " " + writeBufSizeStr
- + " requires a positive int number.";
- if (writeBufSizeStr.endsWith("%")) {
- writeBufSizeStr = writeBufSizeStr.substring(0,
writeBufSizeStr.length() - 1);
- }
- try {
- if (Integer.parseInt(writeBufSizeStr) < 0) {
- throw new DdlException(memLimitErr);
- }
- } catch (NumberFormatException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(memLimitErr, e);
- }
- throw new DdlException(memLimitErr);
- }
- }
-
if (properties.containsKey(SLOT_MEMORY_POLICY)) {
String value = properties.get(SLOT_MEMORY_POLICY).toLowerCase();
if (!AVAILABLE_SLOT_MEMORY_POLICY_VALUES.contains(value)) {
@@ -741,10 +709,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
tWorkloadGroupInfo.setMinMemoryPercent(this.getMinMemoryPercent());
}
- String writeBufferRatioStr = properties.get(WRITE_BUFFER_RATIO);
- if (writeBufferRatioStr != null) {
-
tWorkloadGroupInfo.setWriteBufferRatio(Integer.parseInt(writeBufferRatioStr));
- }
String slotMemoryPolicyStr = properties.get(SLOT_MEMORY_POLICY);
if (slotMemoryPolicyStr != null) {
tWorkloadGroupInfo.setSlotMemoryPolicy(findSlotPolicyValueByString(slotMemoryPolicyStr));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 967cb516907..b7cbb082187 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -85,7 +85,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
.add(WorkloadGroup.COMPUTE_GROUP)
.add(WorkloadGroup.READ_BYTES_PER_SECOND)
.add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND)
- .add(WorkloadGroup.WRITE_BUFFER_RATIO)
.add(WorkloadGroup.SLOT_MEMORY_POLICY)
.add(QueryQueue.RUNNING_QUERY_NUM)
.add(QueryQueue.WAITING_QUERY_NUM)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]