This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 1e96f93bc77 block memtable when memory is not enough (#42418)
1e96f93bc77 is described below
commit 1e96f93bc778cf9ec9915be7d78d4cc67194aca9
Author: yiguolei <[email protected]>
AuthorDate: Fri Oct 25 18:10:47 2024 +0800
block memtable when memory is not enough (#42418)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
Co-authored-by: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
---
be/src/olap/memtable.cpp | 1 +
be/src/olap/memtable_memory_limiter.cpp | 56 ++-
be/src/olap/memtable_memory_limiter.h | 9 +-
be/src/pipeline/exec/file_scan_operator.cpp | 2 +-
be/src/runtime/load_channel.h | 2 +
be/src/runtime/load_channel_mgr.cpp | 3 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 15 +
be/src/runtime/memory/mem_tracker_limiter.h | 26 +-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 18 +-
be/src/runtime/query_context.cpp | 1 +
be/src/runtime/query_context.h | 7 +-
be/src/runtime/runtime_state.cpp | 4 +
be/src/runtime/runtime_state.h | 3 +
be/src/runtime/workload_group/workload_group.cpp | 145 ++++++--
be/src/runtime/workload_group/workload_group.h | 49 +--
.../workload_group/workload_group_manager.cpp | 406 +++++++++++----------
.../workload_group/workload_group_manager.h | 20 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 14 +-
.../resource/workloadgroup/WorkloadGroup.java | 51 ++-
gensrc/thrift/BackendService.thrift | 1 +
gensrc/thrift/PaloInternalService.thrift | 2 +-
22 files changed, 523 insertions(+), 315 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index e0f19b1624d..facbc90c450 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -78,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id,
std::shared_ptr<TabletSchema> tablet_schem
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_mem_tracker = std::make_shared<MemTracker>();
+ _query_thread_context.query_mem_tracker->push_load_buffer(_mem_tracker);
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index 7a220ef87ac..2071f814768 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -111,6 +111,21 @@ int64_t MemTableMemoryLimiter::_need_flush() {
return need_flush - _queue_mem_usage;
}
+void
MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPtr
wg) {
+ // It means some query is pending on here to flush memtable and to
continue running.
+ // So that should wait here.
+ // Wait at most 1s, because this code is not aware cancel flag. If the
load task is cancelled
+ // Should releae memory quickly.
+ using namespace std::chrono_literals;
+ int32_t sleep_times = 10;
+ while (wg != nullptr && wg->enable_load_buffer_limit() && sleep_times > 0)
{
+ std::this_thread::sleep_for(100ms);
+ --sleep_times;
+ }
+ // Check process memory again.
+ handle_memtable_flush();
+}
+
void MemTableMemoryLimiter::handle_memtable_flush() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
@@ -150,20 +165,39 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
}
-void MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id,
- int64_t
need_flush_bytes) {
+int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id,
int64_t need_flush) {
std::unique_lock<std::mutex> l(_lock);
- _flush_active_memtables(wg_id, need_flush_bytes);
+ return _flush_active_memtables(wg_id, need_flush);
+}
+
+void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id,
int64_t* active_bytes,
+ int64_t*
queue_bytes,
+ int64_t*
flush_bytes) {
+ std::unique_lock<std::mutex> l(_lock);
+ *active_bytes = 0;
+ *queue_bytes = 0;
+ *flush_bytes = 0;
+ for (auto it = _writers.begin(); it != _writers.end(); ++it) {
+ if (auto writer = it->lock()) {
+ // If wg id is specified, but wg id not match, then not need flush
+ if (writer->workload_group_id() != wg_id) {
+ continue;
+ }
+ *active_bytes += writer->active_memtable_mem_consumption();
+ *queue_bytes += writer->mem_consumption(MemType::WRITE_FINISHED);
+ *flush_bytes += writer->mem_consumption(MemType::FLUSH);
+ }
+ }
}
-void MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t
need_flush) {
+int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t
need_flush) {
if (need_flush <= 0) {
- return;
+ return 0;
}
_refresh_mem_tracker();
if (_active_writers.size() == 0) {
- return;
+ return 0;
}
using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
@@ -211,6 +245,7 @@ void
MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t need
}
LOG(INFO) << "flushed " << num_flushed << " out of " <<
_active_writers.size()
<< " active writers, flushed size: " <<
PrettyPrinter::print_bytes(mem_flushed);
+ return mem_flushed;
}
void MemTableMemoryLimiter::refresh_mem_tracker() {
@@ -245,29 +280,20 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
_flush_mem_usage = 0;
_queue_mem_usage = 0;
_active_mem_usage = 0;
- std::map<uint64_t, doris::MemtableUsage> wg_mem_usages;
_active_writers.clear();
for (auto it = _writers.begin(); it != _writers.end();) {
if (auto writer = it->lock()) {
- if (wg_mem_usages.find(writer->workload_group_id()) ==
wg_mem_usages.end()) {
- wg_mem_usages.insert({writer->workload_group_id(), {0, 0, 0}});
- }
- auto& wg_mem_usage =
wg_mem_usages.find(writer->workload_group_id())->second;
-
// The memtable is currently used by writer to insert blocks.
auto active_usage = writer->active_memtable_mem_consumption();
- wg_mem_usage.active_mem_usage += active_usage;
_active_mem_usage += active_usage;
if (active_usage > 0) {
_active_writers.push_back(writer);
}
auto flush_usage = writer->mem_consumption(MemType::FLUSH);
- wg_mem_usage.flush_mem_usage += flush_usage;
_flush_mem_usage += flush_usage;
auto write_usage =
writer->mem_consumption(MemType::WRITE_FINISHED);
- wg_mem_usage.queue_mem_usage += write_usage;
_queue_mem_usage += write_usage;
++it;
} else {
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index 143edaa8fe5..de2fb802165 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -21,6 +21,7 @@
#include "common/status.h"
#include "runtime/memory/mem_tracker.h"
+#include "runtime/workload_group/workload_group.h"
#include "util/countdown_latch.h"
#include "util/stopwatch.hpp"
@@ -37,13 +38,17 @@ public:
Status init(int64_t process_mem_limit);
+ void handle_workload_group_memtable_flush(WorkloadGroupPtr wg);
// check if the total mem consumption exceeds limit.
// If yes, it will flush memtable to try to reduce memory consumption.
// Every write operation will call this API to check if need flush
memtable OR hang
// when memory is not available.
void handle_memtable_flush();
- void flush_workload_group_memtables(uint64_t wg_id, int64_t
need_flush_bytes);
+ int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t
need_flush_bytes);
+
+ void get_workload_group_memtable_usage(uint64_t wg_id, int64_t*
active_bytes,
+ int64_t* queue_bytes, int64_t*
flush_bytes);
void register_writer(std::weak_ptr<MemTableWriter> writer);
@@ -61,7 +66,7 @@ private:
bool _hard_limit_reached();
bool _load_usage_low();
int64_t _need_flush();
- void _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
+ int64_t _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
void _refresh_mem_tracker();
std::mutex _lock;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 7018c279d35..b7d593350e7 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -63,7 +63,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
auto wg_ptr = state->get_query_ctx()->workload_group();
_max_scanners =
config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
- if (wg_ptr && state->get_query_ctx()->enable_query_slot_hard_limit()) {
+ if (wg_ptr && !state->get_query_ctx()->enable_mem_overcommit()) {
const auto total_slots = wg_ptr->total_query_slot_count();
const auto query_slots = state->get_query_ctx()->get_slot_count();
_max_scanners = _max_scanners * query_slots / total_slots;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 36a8f363ba9..c9ed66f8c74 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -69,6 +69,8 @@ public:
bool is_high_priority() const { return _is_high_priority; }
+ WorkloadGroupPtr workload_group() const { return
_query_thread_context.wg_wptr.lock(); }
+
RuntimeProfile::Counter* get_mgr_add_batch_timer() { return
_mgr_add_batch_timer; }
RuntimeProfile::Counter* get_handle_mem_limit_timer() { return
_handle_mem_limit_timer; }
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index c53cade466b..55db6564488 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -150,7 +150,8 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
+ channel->workload_group());
}
// 3. add batch to load channel
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 5c79e0d37a6..487b7d48fae 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -257,6 +257,21 @@ void MemTrackerLimiter::clean_tracker_limiter_group() {
#endif
}
+void MemTrackerLimiter::update_load_buffer_size() {
+ std::lock_guard l(_load_buffer_lock);
+ int64_t total_buf_size = 0;
+ for (auto memtable_tracker = _load_buffers.begin(); memtable_tracker !=
_load_buffers.end();) {
+ auto m = memtable_tracker->lock();
+ if (m == nullptr) {
+ memtable_tracker = _load_buffers.erase(memtable_tracker);
+ } else {
+ total_buf_size += m->consumption();
+ ++memtable_tracker;
+ }
+ }
+ _load_buffer_size = total_buf_size;
+}
+
void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile,
MemTrackerLimiter::Type
type) {
if (type == Type::GLOBAL) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index bca16290c17..6f0ecdbe975 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -35,6 +35,7 @@
#include "common/config.h"
#include "common/status.h"
#include "runtime/memory/mem_counter.h"
+#include "runtime/memory/mem_tracker.h"
#include "runtime/query_statistics.h"
#include "util/string_util.h"
#include "util/uid_util.h"
@@ -45,6 +46,7 @@ class RuntimeProfile;
class MemTrackerLimiter;
constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
+constexpr size_t QUERY_MIN_MEMORY = 32 * 1024 * 1024;
struct TrackerLimiterGroup {
// Note! in order to enable ExecEnv::mem_tracker_limiter_pool support
resize,
@@ -132,6 +134,7 @@ public:
~MemTrackerLimiter();
Type type() const { return _type; }
+ void set_overcommit(bool enable) { _enable_overcommit = enable; }
const std::string& label() const { return _label; }
std::shared_ptr<QueryStatistics> get_query_statistics() { return
_query_statistics; }
int64_t group_num() const { return _group_num; }
@@ -141,7 +144,6 @@ public:
Status check_limit(int64_t bytes = 0);
// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();
- bool is_overcommit_tracker() const { return type() == Type::QUERY ||
type() == Type::LOAD; }
void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) {
_is_query_cancelled.store(is_cancelled); }
@@ -208,7 +210,8 @@ public:
if (UNLIKELY(bytes == 0)) {
return true;
}
- bool rt = _mem_counter.try_add(bytes, _limit);
+ // Reserve will check limit, should ignore load buffer size.
+ bool rt = _mem_counter.try_add(bytes - _load_buffer_size, _limit);
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
@@ -234,6 +237,15 @@ public:
static void make_top_consumption_tasks_tracker_profile(RuntimeProfile*
profile, int top_num);
static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
+ void push_load_buffer(std::shared_ptr<MemTracker> memtable_tracker) {
+ std::lock_guard l(_load_buffer_lock);
+ _load_buffers.push_back(memtable_tracker);
+ }
+
+ void update_load_buffer_size();
+
+ int64_t load_buffer_size() const { return _load_buffer_size; }
+
void print_log_usage(const std::string& msg);
void enable_print_log_usage() { _enable_print_log_usage = true; }
@@ -300,6 +312,7 @@ private:
*/
Type _type;
+ bool _enable_overcommit = true;
// label used in the make snapshot, not guaranteed unique.
std::string _label;
@@ -327,6 +340,10 @@ private:
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
+ std::mutex _load_buffer_lock;
+ std::vector<std::weak_ptr<MemTracker>> _load_buffers;
+ std::atomic<int64_t> _load_buffer_size = 0;
+
struct AddressSanitizer {
size_t size;
std::string stack_trace;
@@ -356,10 +373,11 @@ inline void MemTrackerLimiter::cache_consume(int64_t
bytes) {
}
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
- if (bytes <= 0 || (is_overcommit_tracker() &&
config::enable_query_memory_overcommit)) {
+ if (bytes <= 0 || _enable_overcommit) {
return Status::OK();
}
- if (_limit > 0 && consumption() + bytes > _limit) {
+ // check limit should ignore memtable size, because it is treated as a
cache
+ if (_limit > 0 && consumption() - _load_buffer_size + bytes > _limit) {
return Status::MemoryLimitExceeded(fmt::format("failed alloc size {},
{}",
MemCounter::print_bytes(bytes),
tracker_limit_exceeded_str()));
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 9a316032bc9..83caf753aed 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -286,21 +286,6 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
// _untracked_mem store bytes that not synchronized to process reserved
memory.
flush_untracked_mem();
auto wg_ptr = _wg_wptr.lock();
- // For wg with overcommit, the limit will only task affect when memory >
soft limit
- // wg mgr will change wg's hard limit property.
- if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() &&
- !wg_ptr->has_changed_to_hard_limit()) {
- // Only do a check here, do not real reserve. If we could reserve it,
it is better, but the logic is too complicated.
- if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
- return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(
- "reserve memory failed, size: {}, because {}",
- PrettyPrinter::print(size, TUnit::BYTES),
- GlobalMemoryArbitrator::process_mem_log_str());
- } else {
-
doris::GlobalMemoryArbitrator::release_process_reserved_memory(size);
- return Status::OK();
- }
- }
if (!_limiter_tracker->try_reserve(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because query memory
exceeded, memory tracker "
@@ -313,7 +298,8 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format(
- "reserve memory failed, size: {}, because wg memory
exceeded, wg info: {}",
+ "reserve memory failed, size: {}, because workload group
memory exceeded, "
+ "workload group: {}",
PrettyPrinter::print(size, TUnit::BYTES),
wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a1c89394c7b..2898ea796b6 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -149,6 +149,7 @@ void QueryContext::_init_query_mem_tracker() {
if (_query_options.__isset.is_report_success &&
_query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}
+ query_mem_tracker->set_overcommit(enable_mem_overcommit());
_user_set_mem_limit = bytes_limit;
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index f16bd0fcf95..1b800d772f5 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -253,10 +253,9 @@ public:
return _query_options.__isset.query_slot_count ?
_query_options.query_slot_count : 1;
}
- bool enable_query_slot_hard_limit() const {
- return _query_options.__isset.enable_query_slot_hard_limit
- ? _query_options.enable_query_slot_hard_limit
- : false;
+ bool enable_mem_overcommit() const {
+ return _query_options.__isset.enable_mem_overcommit ?
_query_options.enable_mem_overcommit
+ : false;
}
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d4e3cba36cd..f146e510e8e 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -308,6 +308,10 @@ std::shared_ptr<MemTrackerLimiter>
RuntimeState::query_mem_tracker() const {
return _query_mem_tracker;
}
+WorkloadGroupPtr RuntimeState::workload_group() {
+ return _query_ctx->workload_group();
+}
+
bool RuntimeState::log_error(const std::string& error) {
std::lock_guard<std::mutex> l(_error_log_lock);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 66ae64d00e3..5c493390dd7 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -44,6 +44,7 @@
#include "io/fs/file_system.h"
#include "io/fs/s3_file_system.h"
#include "runtime/task_execution_context.h"
+#include "runtime/workload_group/workload_group.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/columns/columns_number.h"
@@ -451,6 +452,8 @@ public:
QueryContext* get_query_ctx() { return _query_ctx; }
+ WorkloadGroupPtr workload_group();
+
void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>&
tracker) {
_query_mem_tracker = tracker;
}
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 6fbd7b752df..cb105816805 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -51,12 +51,14 @@ const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
// This is a invalid value, and should ignore this value during usage
const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;
+const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
: _id(tg_info.id),
_name(tg_info.name),
_version(tg_info.version),
_memory_limit(tg_info.memory_limit),
+ _load_buffer_ratio(tg_info.load_buffer_ratio),
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
_cpu_share(tg_info.cpu_share),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
@@ -83,29 +85,25 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo&
tg_info)
std::make_unique<bvar::Adder<size_t>>(_name,
"total_local_read_bytes");
_total_local_scan_io_per_second =
std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second",
_total_local_scan_io_adder.get(), 1);
- _load_buffer_limit = (int64_t)(_memory_limit * 0.2);
- // Its initial value should equal to memory limit, or it will be 0 and all
reserve memory request will failed.
- _weighted_memory_limit = _memory_limit;
}
std::string WorkloadGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
- auto mem_used_ratio = realtime_total_mem_used /
((double)_weighted_memory_limit + 1);
+ auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit +
1);
return fmt::format(
"WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
"total_query_slot_count={}, "
- "memory_limit = {}, "
- "enable_memory_overcommit = {}, weighted_memory_limit = {},
total_mem_used = {},"
+ "memory_limit = {}, load_buffer_ratio= {}%"
+ "enable_memory_overcommit = {}, total_mem_used = {},"
"wg_refresh_interval_memory_growth = {}, mem_used_ratio = {},
spill_low_watermark = "
"{}, spill_high_watermark = {},cpu_hard_limit = {},
scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num =
{}, "
"is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
_id, _name, _version, cpu_share(), _total_query_slot_count,
- PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+ PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_load_buffer_ratio,
_enable_memory_overcommit ? "true" : "false",
- PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(),
TUnit::BYTES),
mem_used_ratio, _spill_low_watermark, _spill_high_watermark,
cpu_hard_limit(),
@@ -115,21 +113,18 @@ std::string WorkloadGroup::debug_string() const {
}
bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
- // If a group is enable memory overcommit, then not need check the limit
- // It is always true, and it will only fail when process memory is not
- // enough.
- if (_enable_memory_overcommit) {
- if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) {
- return false;
- } else {
- return true;
- }
- }
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
- return false;
+ // If a group is enable memory overcommit, then not need check the
limit
+ // It is always true, and it will only fail when process memory is not
+ // enough.
+ if (_enable_memory_overcommit) {
+ return true;
+ } else {
+ return false;
+ }
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
return true;
@@ -138,16 +133,15 @@ bool
WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
std::string WorkloadGroup::memory_debug_string() const {
auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
- auto mem_used_ratio = realtime_total_mem_used /
((double)_weighted_memory_limit + 1);
+ auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit +
1);
return fmt::format(
"WorkloadGroup[id = {}, name = {}, memory_limit = {},
enable_memory_overcommit = "
- "{}, weighted_memory_limit = {}, total_mem_used = {},"
+ "{}, total_mem_used = {},"
"wg_refresh_interval_memory_growth = {}, mem_used_ratio = {},
spill_low_watermark = "
"{}, "
"spill_high_watermark = {}, version = {}, is_shutdown = {},
query_num = {}]",
_id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false",
- PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(),
TUnit::BYTES),
mem_used_ratio, _spill_low_watermark, _spill_high_watermark,
_version, _is_shutdown,
@@ -181,6 +175,7 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& tg_info) {
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second =
tg_info.remote_read_bytes_per_second;
_total_query_slot_count = tg_info.total_query_slot_count;
+ _load_buffer_ratio = tg_info.load_buffer_ratio;
} else {
return;
}
@@ -188,9 +183,9 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& tg_info) {
}
// MemtrackerLimiter is not removed during query context release, so that
should remove it here.
-int64_t WorkloadGroup::make_memory_tracker_snapshots(
- std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
+int64_t WorkloadGroup::refresh_memory_usage() {
int64_t used_memory = 0;
+ int64_t load_buffer_size = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (auto trackerWptr = mem_tracker_group.trackers.begin();
@@ -199,16 +194,16 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
if (tracker == nullptr) {
trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
} else {
- if (tracker_snapshots != nullptr) {
- tracker_snapshots->insert(tracker_snapshots->end(),
tracker);
- }
+ tracker->update_load_buffer_size();
used_memory += tracker->consumption();
+ load_buffer_size += tracker->load_buffer_size();
++trackerWptr;
}
}
}
// refresh total memory used.
_total_mem_used = used_memory;
+ _load_buffer_size = load_buffer_size;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
@@ -218,7 +213,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
}
int64_t WorkloadGroup::memory_used() {
- return make_memory_tracker_snapshots(nullptr);
+ return refresh_memory_usage();
}
void WorkloadGroup::do_sweep() {
@@ -255,6 +250,91 @@ void
WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> m
_mem_tracker_limiter_pool[group_num].trackers.end(),
mem_tracker_ptr);
}
+int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem,
RuntimeProfile* profile) {
+ if (need_free_mem <= 0) {
+ return 0;
+ }
+ int64_t used_memory = memory_used();
+ // Change need free mem to exceed limit
+ need_free_mem = std::min<int64_t>(used_memory - _memory_limit,
need_free_mem);
+ if (need_free_mem <= 0) {
+ return 0;
+ }
+
+ int64_t freed_mem = 0;
+
+ std::string cancel_str =
+ fmt::format("Kill overcommit query, wg id:{}, name:{}, used:{},
limit:{}, backend:{}.",
+ _id, _name, MemCounter::print_bytes(used_memory),
+ MemCounter::print_bytes(_memory_limit),
BackendOptions::get_localhost());
+
+ auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
+ const std::string& label) {
+ return fmt::format(
+ "{} cancel top memory overcommit tracker <{}> consumption {}.
details:{}, "
+ "Execute again after enough memory, details see be.INFO.",
+ cancel_str, label, MemCounter::print_bytes(mem_consumption),
+ GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
+ };
+
+ LOG(INFO) << fmt::format(
+ "Workload group start gc, id:{} name:{}, memory limit: {}, used:
{}, "
+ "need_free_mem: {}.",
+ _id, _name, _memory_limit, used_memory, need_free_mem);
+ Defer defer {[&]() {
+ LOG(INFO) << fmt::format(
+ "Workload group finished gc, id:{} name:{}, memory limit: {},
used: "
+ "{}, need_free_mem: {}, freed memory: {}.",
+ _id, _name, _memory_limit, used_memory, need_free_mem,
freed_mem);
+ }};
+
+ // 1. free top overcommit query
+ RuntimeProfile* tmq_profile = profile->create_child(
+ fmt::format("FreeGroupTopOvercommitQuery:Name {}", _name), true,
true);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY,
_mem_tracker_limiter_pool,
+ cancel_top_overcommit_str, tmq_profile,
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+ // To be compatible with the non-group's gc logic, minorGC just gc
overcommit query
+ if (freed_mem >= need_free_mem) {
+ return freed_mem;
+ }
+ auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const
std::string& label) {
+ return fmt::format(
+ "{} cancel top memory used tracker <{}> consumption {}.
details:{}, Execute "
+ "again "
+ "after enough memory, details see be.INFO.",
+ cancel_str, label, MemCounter::print_bytes(mem_consumption),
+
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
+ };
+ // 2. free top usage query
+ tmq_profile =
+ profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name
{}", _name), true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY,
_mem_tracker_limiter_pool,
+ cancel_top_usage_str, tmq_profile,
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+ if (freed_mem >= need_free_mem) {
+ return freed_mem;
+ }
+
+ // 3. free top overcommit load
+ tmq_profile =
profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", _name),
+ true, true);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD,
_mem_tracker_limiter_pool,
+ cancel_top_overcommit_str, tmq_profile,
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+ if (freed_mem >= need_free_mem) {
+ return freed_mem;
+ }
+
+ // 4. free top usage load
+ tmq_profile =
+ profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}",
_name), true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD,
_mem_tracker_limiter_pool,
+ cancel_top_usage_str, tmq_profile,
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+ return freed_mem;
+}
+
int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile*
profile, bool is_minor_gc) {
if (need_free_mem <= 0) {
return 0;
@@ -466,6 +546,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
total_query_slot_count = tworkload_group_info.total_query_slot_count;
}
+ // 17 load buffer memory limit
+ int load_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE;
+ if (tworkload_group_info.__isset.load_buffer_ratio) {
+ load_buffer_ratio = tworkload_group_info.load_buffer_ratio;
+ }
+
return {.id = tg_id,
.name = name,
.cpu_share = cpu_share,
@@ -481,7 +567,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.spill_high_watermark = spill_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second,
- .total_query_slot_count = total_query_slot_count};
+ .total_query_slot_count = total_query_slot_count,
+ .load_buffer_ratio = load_buffer_ratio};
}
void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv*
exec_env) {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index f2e2b395ec3..8b1e30fea6a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -80,14 +80,14 @@ public:
int64_t total_mem_used() const { return _total_mem_used; }
- void set_weighted_memory_limit(int64_t weighted_memory_limit) {
- _weighted_memory_limit = weighted_memory_limit;
- }
+ int64_t load_mem_used() const { return _load_buffer_size; }
+
+ void enable_load_buffer_limit(bool enable_limit) {
_enable_load_buffer_limit = enable_limit; }
+
+ bool enable_load_buffer_limit() const { return _enable_load_buffer_limit; }
// make memory snapshots and refresh total memory used at the same time.
- int64_t make_memory_tracker_snapshots(
- std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
- // call make_memory_tracker_snapshots, so also refresh total memory used.
+ int64_t refresh_memory_usage();
int64_t memory_used();
void do_sweep();
@@ -120,20 +120,6 @@ public:
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
}
- void update_load_mem_usage(int64_t active_bytes, int64_t queue_bytes,
int64_t flush_bytes) {
- std::unique_lock<std::shared_mutex> wlock(_mutex);
- _active_mem_usage = active_bytes;
- _queue_mem_usage = queue_bytes;
- _flush_mem_usage = flush_bytes;
- }
-
- void get_load_mem_usage(int64_t* active_bytes, int64_t* queue_bytes,
int64_t* flush_bytes) {
- std::shared_lock<std::shared_mutex> r_lock(_mutex);
- *active_bytes += _active_mem_usage;
- *queue_bytes += _queue_mem_usage;
- *flush_bytes += _flush_mem_usage;
- }
-
std::string debug_string() const;
std::string memory_debug_string() const;
@@ -211,11 +197,9 @@ public:
}
int64_t get_remote_scan_bytes_per_second();
- int64_t load_buffer_limit() { return _load_buffer_limit; }
+ int64_t load_buffer_limit() const { return _memory_limit *
_load_buffer_ratio / 100; }
- bool has_changed_to_hard_limit() const { return _has_changed_hard_limit; }
-
- void change_to_hard_limit(bool to_hard_limit) { _has_changed_hard_limit =
to_hard_limit; }
+ int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile*
profile);
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
@@ -226,19 +210,11 @@ private:
// For example, load memtable, write to parquet.
// If the wg's memory reached high water mark, then the load buffer
// will be restricted to this limit.
- int64_t _load_buffer_limit;
- std::atomic<bool> _has_changed_hard_limit = false;
-
- // memory used by load memtable
- int64_t _active_mem_usage = 0;
- int64_t _queue_mem_usage = 0;
- int64_t _flush_mem_usage = 0;
-
- // `weighted_memory_limit` less than or equal to _memory_limit, calculate
after exclude public memory.
- // more detailed description in `refresh_wg_weighted_memory_limit`.
- std::atomic<int64_t> _weighted_memory_limit {0}; //
- // last value of make_memory_tracker_snapshots, refresh every time
make_memory_tracker_snapshots is called.
+ int64_t _load_buffer_ratio = 0;
+ std::atomic<bool> _enable_load_buffer_limit = false;
+
std::atomic_int64_t _total_mem_used = 0; // bytes
+ std::atomic_int64_t _load_buffer_size = 0;
std::atomic_int64_t _wg_refresh_interval_memory_growth;
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
@@ -296,6 +272,7 @@ struct WorkloadGroupInfo {
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
const int total_query_slot_count = 0;
+ const int load_buffer_ratio = 0;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index b42aeeb1b43..784c09555c4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -162,18 +162,16 @@ struct WorkloadGroupMemInfo {
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
std::list<std::shared_ptr<MemTrackerLimiter>>();
};
+
void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
// 1. make all workload groups memory snapshots(refresh workload groups
total memory used at the same time)
// and calculate total memory used of all queries.
int64_t all_workload_groups_mem_usage = 0;
- std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
bool has_wg_exceed_limit = false;
for (auto& [wg_id, wg] : _workload_groups) {
- wgs_mem_info[wg_id].total_mem_used =
-
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
- all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
+ all_workload_groups_mem_usage += wg->refresh_memory_usage();
if (wg->exceed_limit()) {
has_wg_exceed_limit = true;
}
@@ -219,7 +217,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
- update_queries_limit(wg.second, false);
+ update_queries_limit_(wg.second, false);
}
}
@@ -264,43 +262,27 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
}
}
-/**
- * 1. When Process's memory is lower than soft limit, then all workload group
will be converted to hard limit (Exception: there is only one workload group).
- * 2. Reserve logic for workload group that is soft limit take no effect, it
will always return success.
- * 3. QueryLimit for streamload,routineload,group commit, take no affect, it
will always return success, but workload group's hard limit will take affect.
- * 4. See handle_non_overcommit_wg_paused_queries for hard limit logic.
- */
-void WorkloadGroupMgr::handle_paused_queries() {
- handle_non_overcommit_wg_paused_queries();
- handle_overcommit_wg_paused_queries();
-}
-
/**
* Strategy 1: A revocable query should not have any running
task(PipelineTask).
* strategy 2: If the workload group has any task exceed workload group
memlimit, then set all queryctx's memlimit
* strategy 3: If any query exceed process memlimit, then should clear all
caches.
* strategy 4: If any query exceed query's memlimit, then do spill disk or
cancel it.
* strategy 5: If any query exceed process's memlimit and cache is zero, then
do following:
- * 1. cancel other wg's(soft limit) query that exceed limit
- * 2. spill disk
- * 3. cancel it self.
*/
-void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
+void WorkloadGroupMgr::handle_paused_queries() {
const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
std::unique_lock<std::mutex> lock(_paused_queries_lock);
- std::vector<std::weak_ptr<QueryContext>> resume_after_gc;
+ bool has_revoked_from_other_group = false;
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
const auto& wg = it->first;
- if (queries_list.empty()) {
- it = _paused_queries_list.erase(it);
- continue;
- }
+
bool is_low_wartermark = false;
bool is_high_wartermark = false;
-
wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
+
bool has_changed_hard_limit = false;
+ int64_t flushed_memtable_bytes = 0;
// If the query is paused because its limit exceed the query itself's
memlimit, then just spill disk.
// The query's memlimit is set using slot mechanism and its value is
set using the user settings, not
// by weighted value. So if reserve failed, then it is actually exceed
limit.
@@ -317,29 +299,12 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
query_it = queries_list.erase(query_it);
continue;
}
- bool wg_changed_to_hard_limit = wg->has_changed_to_hard_limit();
- // Only deal with non overcommit workload group.
- if (wg->enable_memory_overcommit() && !wg_changed_to_hard_limit &&
-
!query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
- // Soft limit wg will only reserve failed when process limit
exceed. But in some corner case,
- // when reserve, the wg is hard limit, the query reserve
failed, but when this loop run
- // the wg is converted to soft limit.
- // So that should resume the query.
- LOG(WARNING) << "query: " << print_id(query_ctx->query_id())
- << " reserve memory failed, but workload group
not converted to hard "
- "limit, it should not happen, resume it again.
paused reason: "
- << query_ctx->paused_reason();
- query_ctx->set_memory_sufficient(true);
- query_it = queries_list.erase(query_it);
- continue;
- }
if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- CHECK(!wg->enable_memory_overcommit() ||
wg_changed_to_hard_limit);
// Streamload, kafka load, group commit will never have query
memory exceeded error because
// their query limit is very large.
- bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
-
query_ctx->paused_reason());
+ bool spill_res = handle_single_query_(query_ctx,
query_it->reserve_size_,
+
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
continue;
@@ -348,7 +313,21 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
continue;
}
} else if
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
- CHECK(!wg->enable_memory_overcommit() ||
wg_changed_to_hard_limit);
+ // Only deal with non overcommit workload group.
+ if (wg->enable_memory_overcommit()) {
+ // Soft limit wg will only reserve failed when process
limit exceed. But in some corner case,
+ // when reserve, the wg is hard limit, the query reserve
failed, but when this loop run
+ // the wg is converted to soft limit.
+ // So that should resume the query.
+ LOG(WARNING)
+ << "query: " << print_id(query_ctx->query_id())
+ << " reserve memory failed because exceed workload
group memlimit, it "
+ "should not happen, resume it again. paused
reason: "
+ << query_ctx->paused_reason();
+ query_ctx->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
// check if the reserve is too large, if it is too large,
// should set the query's limit only.
// Check the query's reserve with expected limit.
@@ -358,13 +337,28 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
query_ctx->set_memory_sufficient(true);
LOG(INFO) << "workload group memory reserve failed because
"
<< query_ctx->debug_string() << " reserve size "
- << query_it->reserve_size_ << " is too large,
set hard limit to "
- << query_ctx->expected_mem_limit() << " and
resume running.";
+ <<
PrettyPrinter::print_bytes(query_it->reserve_size_)
+ << " is too large, set hard limit to "
+ <<
PrettyPrinter::print_bytes(query_ctx->expected_mem_limit())
+ << " and resume running.";
query_it = queries_list.erase(query_it);
continue;
}
+ if (flushed_memtable_bytes < 0) {
+ flushed_memtable_bytes =
flush_memtable_from_current_group_(
+ query_ctx, wg, query_it->reserve_size_);
+ }
+ if (flushed_memtable_bytes > 0) {
+ // Flushed some memtable, just wait flush finished and not
do anything more.
+ wg->enable_load_buffer_limit(true);
+ ++query_it;
+ continue;
+ } else {
+ // If could not revoke memory by flush memtable, then
disable load buffer limit
+ wg->enable_load_buffer_limit(false);
+ }
if (!has_changed_hard_limit) {
- update_queries_limit(wg, true);
+ update_queries_limit_(wg, true);
has_changed_hard_limit = true;
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< " reserve memory failed due to workload group
memory exceed, "
@@ -372,42 +366,6 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
"so that other query will reduce their
memory. wg: "
<< wg->debug_string();
}
- // If there are a lot of memtable memory, then wait them flush
finished.
- MemTableMemoryLimiter* memtable_limiter =
-
doris::ExecEnv::GetInstance()->memtable_memory_limiter();
- // Not use memlimit, should use high water mark.
- int64_t memtable_active_bytes = 0;
- int64_t memtable_queue_bytes = 0;
- int64_t memtable_flush_bytes = 0;
- wg->get_load_mem_usage(&memtable_active_bytes,
&memtable_queue_bytes,
- &memtable_flush_bytes);
- // TODO: should add a signal in memtable limiter to prevent
new batch
- // For example, streamload, it will not reserve many memory,
but it will occupy many memtable memory.
- // TODO: 0.2 should be a workload group properties. For
example, the group is optimized for load,then the value
- // should be larged, if the group is optimized for query, then
the value should be smaller.
- int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
- if (memtable_active_bytes + memtable_queue_bytes +
memtable_flush_bytes >
- max_wg_memtable_bytes) {
- // There are many table in flush queue, just waiting them
flush finished.
- if (memtable_active_bytes <
(int64_t)(max_wg_memtable_bytes * 0.6)) {
- LOG_EVERY_T(INFO, 60)
- << wg->name() << " load memtable size is: " <<
memtable_active_bytes
- << ", " << memtable_queue_bytes << ", " <<
memtable_flush_bytes
- << ", load buffer limit is: " <<
max_wg_memtable_bytes
- << " wait for flush finished to release more
memory";
- continue;
- } else {
- // Flush some memtables(currently written) to flush
queue.
- memtable_limiter->flush_workload_group_memtables(
- wg->id(),
- memtable_active_bytes -
(int64_t)(max_wg_memtable_bytes * 0.6));
- LOG_EVERY_T(INFO, 60)
- << wg->name() << " load memtable size is: " <<
memtable_active_bytes
- << ", " << memtable_queue_bytes << ", " <<
memtable_flush_bytes
- << ", flush some active memtable to revoke
memory";
- continue;
- }
- }
// Should not put the query back to task scheduler
immediately, because when wg's memory not sufficient,
// and then set wg's flag, other query may not free memory
very quickly.
if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
@@ -415,13 +373,12 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< " will be resume.";
query_ctx->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
+ continue;
} else {
++query_it;
+ continue;
}
- continue;
} else {
- // PROCESS Reserve logic using hard limit, if reached here,
should try to spill or cancel.
- // GC Logic also work at hard limit, so GC may cancel some
query and could not spill here.
// If wg's memlimit not exceed, but process memory exceed, it
means cache or other metadata
// used too much memory. Should clean all cache here.
// 1. Check cache used, if cache is larger than > 0, then just
return and wait for it to 0 to release some memory.
@@ -437,34 +394,43 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
"to 0 now";
}
if (query_it->cache_ratio_ < 0.001) {
- if (query_it->any_wg_exceed_limit_) {
- if (wg->enable_memory_overcommit()) {
- if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
- resume_after_gc.push_back(query_ctx);
+ // 1. Check if could revoke some memory from memtable
+ if (flushed_memtable_bytes < 0) {
+ flushed_memtable_bytes =
flush_memtable_from_current_group_(
+ query_ctx, wg, query_it->reserve_size_);
+ }
+ if (flushed_memtable_bytes > 0) {
+ // Flushed some memtable, just wait flush finished and
not do anything more.
+ ++query_it;
+ continue;
+ }
+ // TODO should wait here to check if the process has
release revoked_size memory and then continue.
+ if (!has_revoked_from_other_group) {
+ int64_t revoked_size = revoke_memory_from_other_group_(
+ query_ctx, wg->enable_memory_overcommit(),
query_it->reserve_size_);
+ if (revoked_size > 0) {
+ has_revoked_from_other_group = true;
+ query_ctx->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
+ // Do not care if the revoked_size > reserve size,
and try to run again.
+ continue;
+ } else {
+ bool spill_res = handle_single_query_(
+ query_ctx, query_it->reserve_size_,
query_ctx->paused_reason());
+ if (spill_res) {
query_it = queries_list.erase(query_it);
continue;
} else {
++query_it;
continue;
}
- } else {
- // current workload group is hard limit, should
not wait other wg with
- // soft limit, just cancel
- resume_after_gc.push_back(query_ctx);
- query_it = queries_list.erase(query_it);
- continue;
}
} else {
- // TODO: Find other exceed limit workload group and
cancel query.
- bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
-
query_ctx->paused_reason());
- if (!spill_res) {
- ++query_it;
- continue;
- } else {
- query_it = queries_list.erase(query_it);
- continue;
- }
+ // If any query is cancelled during process limit
stage, should resume other query and
+ // do not do any check now.
+ query_ctx->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
+ continue;
}
}
if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
@@ -479,68 +445,148 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
++query_it;
}
}
+ // Not need waiting flush memtable and below low watermark disable
load buffer limit
+ if (flushed_memtable_bytes <= 0 && !is_low_wartermark) {
+ wg->enable_load_buffer_limit(false);
+ }
- // Finished deal with one workload group, and should deal with next
one.
- ++it;
- }
- // TODO minor GC to release some query
- if (!resume_after_gc.empty()) {
+ if (queries_list.empty()) {
+ it = _paused_queries_list.erase(it);
+ continue;
+ } else {
+ // Finished deal with one workload group, and should deal with
next one.
+ ++it;
+ }
}
- for (auto resume_it = resume_after_gc.begin(); resume_it !=
resume_after_gc.end();
- ++resume_it) {
- auto query_ctx = resume_it->lock();
- if (query_ctx != nullptr) {
- query_ctx->set_memory_sufficient(true);
+}
+
+// Return the expected free bytes if memtable could flush
+int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(
+ std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t
need_free_mem) {
+ // If there are a lot of memtable memory, then wait them flush finished.
+ MemTableMemoryLimiter* memtable_limiter =
+ doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+ int64_t memtable_active_bytes = 0;
+ int64_t memtable_queue_bytes = 0;
+ int64_t memtable_flush_bytes = 0;
+ memtable_limiter->get_workload_group_memtable_usage(
+ wg->id(), &memtable_active_bytes, &memtable_queue_bytes,
&memtable_flush_bytes);
+ // TODO: should add a signal in memtable limiter to prevent new batch
+ // For example, streamload, it will not reserve many memory, but it will
occupy many memtable memory.
+ // TODO: 0.2 should be a workload group properties. For example, the group
is optimized for load,then the value
+ // should be larged, if the group is optimized for query, then the value
should be smaller.
+ int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
+ if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
+ max_wg_memtable_bytes) {
+ // There are many table in flush queue, just waiting them flush
finished.
+ if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) {
+ LOG_EVERY_T(INFO, 60) << wg->name()
+ << " load memtable size is: " <<
memtable_active_bytes << ", "
+ << memtable_queue_bytes << ", " <<
memtable_flush_bytes
+ << ", load buffer limit is: " <<
max_wg_memtable_bytes
+ << " wait for flush finished to release more
memory";
+ return memtable_queue_bytes + memtable_flush_bytes;
+ } else {
+ // Flush some memtables(currently written) to flush queue.
+ memtable_limiter->flush_workload_group_memtables(
+ wg->id(), memtable_active_bytes -
(int64_t)(max_wg_memtable_bytes * 0.6));
+ LOG_EVERY_T(INFO, 60) << wg->name()
+ << " load memtable size is: " <<
memtable_active_bytes << ", "
+ << memtable_queue_bytes << ", " <<
memtable_flush_bytes
+ << ", flush some active memtable to revoke
memory";
+ return memtable_queue_bytes + memtable_flush_bytes +
memtable_active_bytes -
+ (int64_t)(max_wg_memtable_bytes * 0.6);
}
}
+ return 0;
}
-// streamload, kafka routine load, group commit
-// insert into select
-// select
+int64_t
WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext>
requestor,
+ bool hard_limit,
int64_t need_free_mem) {
+ int64_t total_freed_mem = 0;
+ std::unique_ptr<RuntimeProfile> profile =
std::make_unique<RuntimeProfile>("RevokeMemory");
+ // 1. memtable like memory
+ // 2. query exceed workload group limit
+ int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem,
profile.get());
+ total_freed_mem += freed_mem;
+ // The revoke process may kill current requestor, so should return now.
+ if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+ return total_freed_mem;
+ }
+ if (hard_limit) {
+ freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem -
total_freed_mem,
+
doris::QUERY_MIN_MEMORY, profile.get());
+ } else {
+ freed_mem = cancel_top_query_in_overcommit_group_(
+ need_free_mem - total_freed_mem,
requestor->get_mem_tracker()->consumption(),
+ profile.get());
+ }
+ total_freed_mem += freed_mem;
+ // The revoke process may kill current requestor, so should return now.
+ if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+ return total_freed_mem;
+ }
+ return total_freed_mem;
+}
-void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- // If there is only one workload group and it is overcommit, then do
nothing.
- // And should also start MinorGC logic.
- if (_workload_groups.size() == 1) {
- return;
+// Revoke memory from workload group that exceed it's limit. For example, if
the wg's limit is 10g, but used 12g
+// then should revoke 2g from the group.
+int64_t
WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext>
requestor,
+ int64_t need_free_mem,
+ RuntimeProfile* profile)
{
+ int64_t total_freed_mem = 0;
+ // 1. check memtable usage, and try to free them.
+ int64_t freed_mem =
revoke_memtable_from_overcommited_groups_(need_free_mem, profile);
+ total_freed_mem += freed_mem;
+ // The revoke process may kill current requestor, so should return now.
+ if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+ return total_freed_mem;
}
- // soft_limit - 10%, will change workload group to hard limit.
- // soft limit, process memory reserve failed.
- // hard limit, FullGC will kill query randomly.
- if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
- (int64_t)(MemInfo::mem_limit() * 0.1))) {
- for (auto& [wg_id, wg] : _workload_groups) {
- if (wg->enable_memory_overcommit() &&
!wg->has_changed_to_hard_limit()) {
- wg->change_to_hard_limit(true);
- LOG(INFO) << "Process memory usage + 10% will exceed soft
limit, change all "
- "workload "
- "group with overcommit to hard limit now. "
- << wg->debug_string();
- }
+ // 2. Cancel top usage query, one by one
+ std::map<WorkloadGroupPtr, int64_t> wg_mem_usage;
+ using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
+ auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
+ return left.second < right.second;
+ };
+ std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>,
decltype(cmp)> heap(cmp);
+ {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (auto iter = _workload_groups.begin(); iter !=
_workload_groups.end(); iter++) {
+ heap.emplace(iter->second, iter->second->memory_used());
}
}
- // If current memory usage is below soft memlimit - 15%, then enable wg's
overcommit
- if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
- (int64_t)(MemInfo::mem_limit() * 0.15))) {
- for (auto& [wg_id, wg] : _workload_groups) {
- if (wg->enable_memory_overcommit() &&
wg->has_changed_to_hard_limit()) {
- wg->change_to_hard_limit(false);
- LOG(INFO) << "Process memory usage is lower than soft limit,
enable all workload "
- "group overcommit now. "
- << wg->debug_string();
- }
- }
+ while (!heap.empty() && need_free_mem - total_freed_mem > 0 &&
!requestor->is_cancelled()) {
+ auto [wg, sort_mem] = heap.top();
+ heap.pop();
+ freed_mem = wg->free_overcommited_memory(need_free_mem -
total_freed_mem, profile);
+ total_freed_mem += freed_mem;
}
+ return total_freed_mem;
}
-// If the query could release some memory, for example, spill disk, flush
memtable then the return value is true.
+// If the memtable is too large, then flush them and wait for finished.
+int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t
need_free_mem,
+
RuntimeProfile* profile) {
+ return 0;
+}
+
+// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage
task that with most memory.
+// 2. Maybe not valid because it's memory not exceed limit.
+int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t
need_free_mem,
+ int64_t
lower_bound,
+
RuntimeProfile* profile) {
+ return 0;
+}
+
+// streamload, kafka routine load, group commit
+// insert into select
+// select
+
+// If the query could release some memory, for example, spill disk, then the
return value is true.
// If the query could not release memory, then cancel the query, the return
value is true.
-// If the query is not ready to do these tasks, it means just wait.
-bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext>
query_ctx,
- size_t size_to_reserve, Status
paused_reason) {
- // TODO: If the query is an insert into select query, should consider
memtable as revoke memory.
+// If the query is not ready to do these tasks, it means just wait, then
return value is false.
+bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext>
query_ctx,
+ size_t size_to_reserve, Status
paused_reason) {
size_t revocable_size = 0;
size_t memory_usage = 0;
bool has_running_task = false;
@@ -556,6 +602,8 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
if (revocable_tasks.empty()) {
if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
const auto limit = query_ctx->get_mem_limit();
+ // During waiting time, another operator in the query may finished
and release
+ // many memory and we could run.
if ((memory_usage + size_to_reserve) < limit) {
LOG(INFO) << "query: " << query_id << ", usage(" <<
memory_usage << " + "
<< size_to_reserve << ") less than limit(" << limit
<< "), resume it.";
@@ -564,7 +612,8 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
} else {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error
code and do try logic
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "query({}) reserve memory failed, but could not find
memory that could "
+ "query({}) reserve memory failed, but could not find
memory that "
+ "could "
"release or spill to disk(usage:{}, limit: {})",
query_id, memory_usage, query_ctx->get_mem_limit()));
}
@@ -584,7 +633,8 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
<< ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
"The query({}) reserved memory failed because process
limit exceeded, and "
- "there is no cache now. And could not find task to spill.
Maybe you should set "
+ "there is no cache now. And could not find task to spill.
Maybe you should "
+ "set "
"the workload group's limit to a lower value.",
query_id));
}
@@ -595,21 +645,15 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
return true;
}
-void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool
enable_hard_limit) {
+void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool
enable_hard_limit) {
auto wg_mem_limit = wg->memory_limit();
- auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1);
- wg->set_weighted_memory_limit(wg_weighted_mem_limit);
auto all_query_ctxs = wg->queries();
bool is_low_wartermark = false;
bool is_high_wartermark = false;
wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
int64_t wg_high_water_mark_limit =
(int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() *
1.0 / 100);
- int64_t memtable_active_bytes = 0;
- int64_t memtable_queue_bytes = 0;
- int64_t memtable_flush_bytes = 0;
- wg->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes,
&memtable_flush_bytes);
- int64_t memtable_usage = memtable_active_bytes + memtable_queue_bytes +
memtable_flush_bytes;
+ int64_t memtable_usage = wg->load_mem_used();
int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
if (memtable_usage > wg->load_buffer_limit()) {
wg_high_water_mark_except_load = wg_high_water_mark_limit -
wg->load_buffer_limit();
@@ -620,20 +664,23 @@ void
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
std::string debug_msg;
if (is_high_wartermark || is_low_wartermark) {
debug_msg = fmt::format(
- "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted
mem limit: {}, "
+ "\nWorkload Group {}: mem limit: {}, mem used: {}, "
"high water mark mem limit: {}, load memtable usage: {}, used
ratio: {}",
wg->name(), PrettyPrinter::print(wg->memory_limit(),
TUnit::BYTES),
PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES),
- PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES),
PrettyPrinter::print(memtable_usage, TUnit::BYTES),
- (double)(wg->total_mem_used()) / wg_weighted_mem_limit);
+ (double)(wg->total_mem_used()) / wg_mem_limit);
}
// If the wg enable over commit memory, then it is no need to update query
memlimit
- if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) {
+ if (wg->enable_memory_overcommit()) {
return;
}
+ // If reached low watermark then enable load buffer limit
+ if (is_low_wartermark) {
+ wg->enable_load_buffer_limit(true);
+ }
int32_t total_used_slot_count = 0;
int32_t total_slot_count = wg->total_query_slot_count();
// calculate total used slot count
@@ -657,7 +704,7 @@ void
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
int64_t query_weighted_mem_limit = 0;
int64_t expected_query_weighted_mem_limit = 0;
// If the query enable hard limit, then it should not use the soft
limit
- if (query_ctx->enable_query_slot_hard_limit()) {
+ if (!query_ctx->enable_mem_overcommit()) {
if (total_slot_count < 1) {
LOG(WARNING)
<< "query " << print_id(query_ctx->query_id())
@@ -701,21 +748,4 @@ void WorkloadGroupMgr::stop() {
}
}
-void WorkloadGroupMgr::update_load_memtable_usage(
- const std::map<uint64_t, MemtableUsage>& wg_memtable_usages) {
- // Use readlock here, because it will not modify workload_groups
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- for (auto it = _workload_groups.begin(); it != _workload_groups.end();
++it) {
- auto wg_usage = wg_memtable_usages.find(it->first);
- if (wg_usage != wg_memtable_usages.end()) {
-
it->second->update_load_mem_usage(wg_usage->second.active_mem_usage,
- wg_usage->second.queue_mem_usage,
-
wg_usage->second.flush_mem_usage);
- } else {
- // Not anything in memtable limiter, then set to 0
- it->second->update_load_mem_usage(0, 0, 0);
- }
- }
-}
-
} // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index c8e1ee7adf7..065528c66ec 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -105,14 +105,20 @@ public:
void handle_paused_queries();
- void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>&
wg_memtable_usages);
-
private:
- bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, size_t
size_to_reserve,
- Status paused_reason);
- void handle_non_overcommit_wg_paused_queries();
- void handle_overcommit_wg_paused_queries();
- void update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit);
+ int64_t cancel_top_query_in_overcommit_group_(int64_t need_free_mem,
int64_t lower_bound,
+ RuntimeProfile* profile);
+ int64_t flush_memtable_from_current_group_(std::shared_ptr<QueryContext>
requestor,
+ WorkloadGroupPtr wg, int64_t
need_free_mem);
+ bool handle_single_query_(std::shared_ptr<QueryContext> query_ctx, size_t
size_to_reserve,
+ Status paused_reason);
+ int64_t revoke_memory_from_other_group_(std::shared_ptr<QueryContext>
requestor,
+ bool hard_limit, int64_t
need_free_mem);
+ int64_t revoke_overcommited_memory_(std::shared_ptr<QueryContext>
requestor,
+ int64_t need_free_mem, RuntimeProfile*
profile);
+ int64_t revoke_memtable_from_overcommited_groups_(int64_t need_free_mem,
+ RuntimeProfile* profile);
+ void update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit);
private:
std::shared_mutex _group_mutex;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 2135e4729fa..894b2b00eeb 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -503,7 +503,8 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
}
{
SCOPED_TIMER(_wait_mem_limit_timer);
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
+ _state->workload_group());
}
SCOPED_TIMER(_write_memtable_timer);
auto st = delta_writer->write(block.get(), rows.row_idxes);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e9d14653e0d..7c8c72020d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -630,7 +630,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String QUERY_SLOT_COUNT = "query_slot_count";
- public static final String ENABLE_QUERY_SLOT_HARD_LIMIT =
"enable_query_slot_hard_limit";
+ public static final String ENABLE_MEM_OVERCOMMIT = "enable_mem_overcommit";
public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";
@@ -709,7 +709,7 @@ public class SessionVariable implements Serializable,
Writable {
public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
// max memory used on every backend.
- @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
+ @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
public long maxExecMemByte = 2147483648L;
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
@@ -811,10 +811,10 @@ public class SessionVariable implements Serializable,
Writable {
}
}
- @VariableMgr.VarAttr(name = ENABLE_QUERY_SLOT_HARD_LIMIT, needForward =
true, description = {
- "是否通过硬限的方式来计算每个Slot的内存资源",
- "Whether to calculate the memory resources of each Slot by hard
limit"})
- public boolean enableQuerySlotHardLimit = false;
+ @VariableMgr.VarAttr(name = ENABLE_MEM_OVERCOMMIT, needForward = true,
description = {
+ "是否通过硬限的方式来计算每个Query的内存资源",
+ "Whether to calculate the memory resources of each query by hard
limit"})
+ public boolean enableMemOvercommit = true;
@VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
public int maxColumnReaderNum = 20000;
@@ -3829,7 +3829,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
tResult.setQuerySlotCount(wgQuerySlotCount);
- tResult.setEnableQuerySlotHardLimit(enableQuerySlotHardLimit);
+ tResult.setEnableMemOvercommit(enableMemOvercommit);
tResult.setKeepCarriageReturn(keepCarriageReturn);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 42929be609b..b428a058277 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -57,6 +57,8 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
public static final String ENABLE_MEMORY_OVERCOMMIT =
"enable_memory_overcommit";
+ public static final String LOAD_BUFFER_RATIO = "load_buffer_ratio";
+
public static final String MAX_CONCURRENCY = "max_concurrency";
public static final String MAX_QUEUE_SIZE = "max_queue_size";
@@ -87,10 +89,12 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
-
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
+
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
+ .add(LOAD_BUFFER_RATIO).build();
- public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
- public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
+ public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75;
+ public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90;
+ public static final int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
@SerializedName(value = "id")
private long id;
@@ -126,6 +130,17 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
this.memoryLimitPercent = Double.parseDouble(
memoryLimitString.substring(0, memoryLimitString.length()
- 1));
}
+
+ if (properties.containsKey(LOAD_BUFFER_RATIO)) {
+ String loadBufLimitStr = properties.get(LOAD_BUFFER_RATIO);
+ if (loadBufLimitStr.endsWith("%")) {
+ loadBufLimitStr = loadBufLimitStr.substring(0,
loadBufLimitStr.length() - 1);
+ }
+ this.properties.put(LOAD_BUFFER_RATIO, loadBufLimitStr);
+ } else {
+ this.properties.put(LOAD_BUFFER_RATIO,
LOAD_BUFFER_RATIO_DEFAULT_VALUE + "");
+ }
+
if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
properties.put(ENABLE_MEMORY_OVERCOMMIT,
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
}
@@ -256,6 +271,26 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
}
}
+ if (properties.containsKey(LOAD_BUFFER_RATIO)) {
+ String memoryLimit = properties.get(LOAD_BUFFER_RATIO);
+ if (!memoryLimit.endsWith("%")) {
+ throw new DdlException(LOAD_BUFFER_RATIO + " " + memoryLimit
+ + " requires a percentage and ends with a '%'");
+ }
+ String memLimitErr = LOAD_BUFFER_RATIO + " " + memoryLimit
+ + " requires a positive int number.";
+ try {
+ if (Integer.parseInt(memoryLimit.substring(0,
memoryLimit.length() - 1)) < 0) {
+ throw new DdlException(memLimitErr);
+ }
+ } catch (NumberFormatException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(memLimitErr, e);
+ }
+ throw new DdlException(memLimitErr);
+ }
+ }
+
if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
String value =
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase();
if (!("true".equals(value) || "false".equals(value))) {
@@ -482,6 +517,12 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
row.add("-1");
} else if (MEMORY_LIMIT.equals(key) &&
!properties.containsKey(key)) {
row.add("0%");
+ } else if (LOAD_BUFFER_RATIO.equals(key)) {
+ if (properties.containsKey(key)) {
+ row.add(properties.get(key) + "%");
+ } else {
+ row.add(LOAD_BUFFER_RATIO_DEFAULT_VALUE + "%");
+ }
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) &&
!properties.containsKey(key)) {
row.add("true");
} else if (SCAN_THREAD_NUM.equals(key) &&
!properties.containsKey(key)) {
@@ -568,6 +609,10 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
if (memLimitStr != null) {
tWorkloadGroupInfo.setMemLimit(memLimitStr);
}
+ String loadBufferRatioStr = properties.get(LOAD_BUFFER_RATIO);
+ if (loadBufferRatioStr != null) {
+
tWorkloadGroupInfo.setLoadBufferRatio(Integer.parseInt(loadBufferRatioStr));
+ }
String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
if (memOvercommitStr != null) {
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 4acd4602432..fa4fedcc9bb 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -271,6 +271,7 @@ struct TWorkloadGroupInfo {
15: optional i64 remote_read_bytes_per_second
16: optional string tag
17: optional i32 total_query_slot_count
+ 18: optional i32 load_buffer_ratio
}
enum TWorkloadMetricType {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 75c3122b5bd..387d0265fdf 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -352,7 +352,7 @@ struct TQueryOptions {
// The minimum memory that an operator required to run.
137: optional i32 minimum_operator_memory_required_kb = 1024;
- 138: optional bool enable_query_slot_hard_limit = false;
+ 138: optional bool enable_mem_overcommit = true;
139: optional i32 query_slot_count = 0;
// For cloud, to control if the content would be written into file cache
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]