This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0075df54ddb [workloadgroup](memory) flush memtable when memory is not 
enough (#54642)
0075df54ddb is described below

commit 0075df54ddb6073ddad59a844881d688c5471182
Author: yiguolei <[email protected]>
AuthorDate: Mon Aug 18 15:11:27 2025 +0800

    [workloadgroup](memory) flush memtable when memory is not enough (#54642)
    
    ### What problem does this PR solve?
    1. flush memtable when memory is not enough
    2. cancel the query that use more memory than min_memory_percent
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   1 +
 be/src/common/daemon.cpp                           |   2 +-
 ...chema_workload_group_resource_usage_scanner.cpp |   1 -
 be/src/olap/memtable_memory_limiter.cpp            |  79 +---
 be/src/olap/memtable_memory_limiter.h              |  21 +-
 be/src/runtime/load_channel_mgr.cpp                |   4 +-
 be/src/runtime/memory/global_memory_arbitrator.cpp |   1 -
 be/src/runtime/memory/global_memory_arbitrator.h   |   1 -
 be/src/runtime/memory/mem_tracker_limiter.h        |   2 -
 be/src/runtime/query_context.cpp                   |   2 +-
 be/src/runtime/workload_group/workload_group.cpp   |  50 ++-
 be/src/runtime/workload_group/workload_group.h     |  29 +-
 .../workload_group/workload_group_manager.cpp      | 475 ++++++++-------------
 .../workload_group/workload_group_manager.h        |  16 +-
 .../runtime/workload_management/memory_context.h   |   3 +-
 .../workload_management/query_task_controller.cpp  |   5 +-
 .../workload_management/query_task_controller.h    |   2 +-
 .../runtime/workload_management/task_controller.h  |   2 +
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   4 +-
 be/test/olap/memtable_memory_limiter_test.cpp      |   2 +-
 .../workload_group/workload_group_manager_test.cpp | 193 ++++++---
 .../java/org/apache/doris/catalog/SchemaTable.java |   1 -
 .../resource/workloadgroup/WorkloadGroup.java      |  38 +-
 .../resource/workloadgroup/WorkloadGroupMgr.java   |   1 -
 25 files changed, 389 insertions(+), 548 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5ab1d344b44..c7eb4136c52 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1391,6 +1391,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
 // paused query in queue timeout(ms) will be resumed or canceled
 DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");
 
+DEFINE_Int64(wait_cancel_release_memory_ms, "5000");
+
 DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
 
 DEFINE_mBool(force_azure_blob_global_endpoint, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4595f416c49..5f5900165b0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1447,6 +1447,7 @@ DECLARE_mInt32(spill_gc_work_time_ms);
 DECLARE_Int32(spill_io_thread_pool_thread_num);
 DECLARE_Int32(spill_io_thread_pool_queue_size);
 DECLARE_Int64(spill_in_paused_queue_timeout_ms);
+DECLARE_Int64(wait_cancel_release_memory_ms);
 
 DECLARE_mBool(check_segment_when_build_rowset_meta);
 
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 4651d1de36e..a5f01ed1975 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -348,7 +348,7 @@ void Daemon::memory_maintenance_thread() {
 
         // step 6. Refresh weighted memory ratio of workload groups.
         doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
-        
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
+        
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_memory_state();
 
         // step 7: handle paused queries(caused by memory insufficient)
         
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();
diff --git 
a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
index b89823f78d2..bae6d2dd51d 100644
--- 
a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
+++ 
b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp
@@ -38,7 +38,6 @@ std::vector<SchemaScanner::ColumnDesc> 
SchemaBackendWorkloadGroupResourceUsage::
         {"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
         {"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
         {"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
-        {"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
 };
 
 
SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index a55b7fe6f59..c8caf5194d6 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -119,43 +119,11 @@ int64_t MemTableMemoryLimiter::_need_flush() {
     return need_flush - _queue_mem_usage - _flush_mem_usage;
 }
 
-void MemTableMemoryLimiter::handle_workload_group_memtable_flush(
-        WorkloadGroupPtr wg, std::function<bool()> cancel_check) {
-    // It means some query is pending on here to flush memtable and to 
continue running.
-    // So that should wait here.
-    // Wait at most 3s, because this code is not aware cancel flag. If the 
load task is cancelled
-    // Should releae memory quickly.
-    using namespace std::chrono_literals;
-    int32_t max_sleep_times = 30;
-    int32_t sleep_times = max_sleep_times;
-    MonotonicStopWatch timer;
-    timer.start();
-    while (wg != nullptr && wg->enable_write_buffer_limit() && 
wg->exceed_write_buffer_limit() &&
-           sleep_times > 0) {
-        if (cancel_check && cancel_check()) {
-            LOG(INFO) << "cancelled when waiting for memtable flush, wg: "
-                      << (wg == nullptr ? "null" : wg->debug_string());
-            return;
-        }
-        std::this_thread::sleep_for(100ms);
-        --sleep_times;
-    }
-    if (sleep_times < max_sleep_times) {
-        timer.stop();
-        VLOG_DEBUG << "handle_workload_group_memtable_flush waited "
-                   << PrettyPrinter::print(timer.elapsed_time(), 
TUnit::TIME_NS)
-                   << ", wg: " << wg->debug_string();
-    }
-    // Check process memory again.
-    _handle_memtable_flush(wg, cancel_check);
-}
-
-void MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
-                                                   std::function<bool()> 
cancel_check) {
+void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> 
cancel_check) {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
     do {
-        
DBUG_EXECUTE_IF("MemTableMemoryLimiter._handle_memtable_flush.limit_reached", {
+        
DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", {
             LOG(INFO) << "debug memtable limit reached";
             break;
         });
@@ -176,8 +144,7 @@ void 
MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
             }
         }
         if (cancel_check && cancel_check()) {
-            LOG(INFO) << "cancelled when waiting for memtable flush, wg: "
-                      << (wg == nullptr ? "null" : wg->debug_string());
+            LOG(INFO) << "cancelled when waiting for memtable flush";
             return;
         }
         first = false;
@@ -192,15 +159,14 @@ void 
MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
                       << ", active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
                       << ", queue: " << 
PrettyPrinter::print_bytes(_queue_mem_usage)
                       << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage)
-                      << ", need flush: " << 
PrettyPrinter::print_bytes(need_flush)
-                      << ", wg: " << (wg ? wg->debug_string() : "null");
+                      << ", need flush: " << 
PrettyPrinter::print_bytes(need_flush);
             if (VLOG_DEBUG_IS_ON) {
                 auto log_str = doris::ProcessProfile::instance()
                                        ->memory_profile()
                                        ->process_memory_detail_str();
                 LOG_LONG_STRING(INFO, log_str);
             }
-            _flush_active_memtables(0, need_flush);
+            _flush_active_memtables(need_flush);
         }
     } while (_hard_limit_reached() && !_load_usage_low());
     g_memtable_memory_limit_waiting_threads << -1;
@@ -216,37 +182,11 @@ void 
MemTableMemoryLimiter::_handle_memtable_flush(WorkloadGroupPtr wg,
                   << ", memtable writers num: " << _writers.size()
                   << ", active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
                   << ", queue: " << 
PrettyPrinter::print_bytes(_queue_mem_usage)
-                  << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage)
-                  << ", wg: " << (wg ? wg->debug_string() : "null.");
-    }
-}
-
-int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, 
int64_t need_flush) {
-    std::unique_lock<std::mutex> l(_lock);
-    return _flush_active_memtables(wg_id, need_flush);
-}
-
-void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, 
int64_t* active_bytes,
-                                                              int64_t* 
queue_bytes,
-                                                              int64_t* 
flush_bytes) {
-    std::unique_lock<std::mutex> l(_lock);
-    *active_bytes = 0;
-    *queue_bytes = 0;
-    *flush_bytes = 0;
-    for (auto it = _writers.begin(); it != _writers.end(); ++it) {
-        if (auto writer = it->lock()) {
-            // If wg id is specified, but wg id not match, then not need flush
-            if (writer->workload_group_id() != wg_id) {
-                continue;
-            }
-            *active_bytes += writer->active_memtable_mem_consumption();
-            *queue_bytes += writer->mem_consumption(MemType::WRITE_FINISHED);
-            *flush_bytes += writer->mem_consumption(MemType::FLUSH);
-        }
+                  << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage);
     }
 }
 
-int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t 
need_flush) {
+int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
     if (need_flush <= 0) {
         return 0;
     }
@@ -278,10 +218,7 @@ int64_t 
MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t n
         if (w == nullptr) {
             continue;
         }
-        // If wg id is specified, but wg id not match, then not need flush
-        if (wg_id != 0 && w->workload_group_id() != wg_id) {
-            continue;
-        }
+
         int64_t mem = w->active_memtable_mem_consumption();
         if (mem < sort_mem * 0.9) {
             // if the memtable writer just got flushed, don't flush it again
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 038a484273c..34dcb2b06b4 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -40,13 +40,11 @@ public:
 
     Status init(int64_t process_mem_limit);
 
-    void handle_workload_group_memtable_flush(WorkloadGroupPtr wg,
-                                              std::function<bool()> 
cancel_check);
-
-    int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes);
-
-    void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* 
active_bytes,
-                                           int64_t* queue_bytes, int64_t* 
flush_bytes);
+    // check if the total mem consumption exceeds limit.
+    // If yes, it will flush memtable to try to reduce memory consumption.
+    // Every write operation will call this API to check if need flush 
memtable OR hang
+    // when memory is not available.
+    void handle_memtable_flush(std::function<bool()> cancel_check);
 
     void register_writer(std::weak_ptr<MemTableWriter> writer);
 
@@ -57,12 +55,6 @@ public:
     int64_t mem_usage() const { return _mem_usage; }
 
 private:
-    // check if the total mem consumption exceeds limit.
-    // If yes, it will flush memtable to try to reduce memory consumption.
-    // Every write operation will call this API to check if need flush 
memtable OR hang
-    // when memory is not available.
-    void _handle_memtable_flush(WorkloadGroupPtr wg, std::function<bool()> 
cancel_check);
-
     static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
     static inline int64_t _process_used_mem_more_than_soft_mem_limit();
 
@@ -70,9 +62,8 @@ private:
     bool _hard_limit_reached();
     bool _load_usage_low();
     int64_t _need_flush();
-    int64_t _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
+    int64_t _flush_active_memtables(int64_t need_flush);
     void _refresh_mem_tracker();
-
     std::mutex _lock;
     std::condition_variable _hard_limit_end_cond;
     int64_t _mem_usage = 0;
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 242e43eabab..0bb352d2feb 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -152,8 +152,8 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBlockRequest& request,
         // If this is a high priority load task, do not handle this.
         // because this may block for a while, which may lead to rpc timeout.
         SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
-                channel->workload_group(), [channel]() { return 
channel->is_cancelled(); });
+        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+                [channel]() { return channel->is_cancelled(); });
         if (channel->is_cancelled()) {
             return Status::Cancelled("LoadChannel has been cancelled: {}.", 
load_id.to_string());
         }
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp 
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 5aed0721fff..8527e719882 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -62,7 +62,6 @@ std::atomic<double> 
GlobalMemoryArbitrator::last_periodic_refreshed_cache_capaci
 std::atomic<double> 
GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted {1};
 // The value that take affect
 std::atomic<double> 
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
-std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit 
{false};
 std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
 std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
 std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify 
{false};
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
index 7ac98b1c1f2..ae685f3fe36 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -166,7 +166,6 @@ public:
     static std::atomic<double> 
last_memory_exceeded_cache_capacity_adjust_weighted;
     // The value that take affect
     static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
-    static std::atomic<bool> any_workload_group_exceed_limit;
 
     static void notify_cache_adjust_capacity() {
         cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 7dd66eafe1b..da9285e5255 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -220,8 +220,6 @@ public:
     static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* 
profile, int top_num);
     static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
 
-    int64_t write_buffer_size() const { return _write_tracker->consumption(); }
-
     std::shared_ptr<MemTrackerLimiter> write_tracker() { return 
_write_tracker; }
 
     void print_log_usage(const std::string& msg);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c9955c25420..ece5ebb25fc 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -181,7 +181,7 @@ void QueryContext::_init_resource_context() {
 }
 
 void QueryContext::init_query_task_controller() {
-    _resource_ctx->set_task_controller(QueryTaskController::create(this));
+    
_resource_ctx->set_task_controller(QueryTaskController::create(shared_from_this()));
     _resource_ctx->task_controller()->set_task_id(_query_id);
     _resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
     
_resource_ctx->task_controller()->set_query_type(_query_options.query_type);
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index e0e58ba9c41..5cf23ca4c41 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -56,7 +56,6 @@ const static int MEMORY_LOW_WATERMARK_DEFAULT_VALUE = 80;
 const static int MEMORY_HIGH_WATERMARK_DEFAULT_VALUE = 95;
 // This is a invalid value, and should ignore this value during usage
 const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;
-const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
 
 WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info)
         : _id(wg_info.id),
@@ -69,7 +68,6 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info)
           _max_memory_percent(wg_info.max_memory_percent),
           _memory_low_watermark(wg_info.memory_low_watermark),
           _memory_high_watermark(wg_info.memory_high_watermark),
-          _load_buffer_ratio(wg_info.write_buffer_ratio),
           _scan_thread_num(wg_info.scan_thread_num),
           _max_remote_scan_thread_num(wg_info.max_remote_scan_thread_num),
           _min_remote_scan_thread_num(wg_info.min_remote_scan_thread_num),
@@ -82,6 +80,10 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& 
wg_info)
         _scan_io_throttle_map[data_dir.path] = 
std::make_shared<IOThrottle>(data_dir.metric_name);
     }
     _remote_scan_io_throttle = std::make_shared<IOThrottle>();
+    if (_max_memory_percent > 0) {
+        _min_memory_limit = static_cast<int64_t>(
+                static_cast<double>(_memory_limit * _min_memory_percent) / 
_max_memory_percent);
+    }
 
     _wg_metrics = std::make_shared<WorkloadGroupMetrics>(this);
 }
@@ -125,20 +127,15 @@ std::string WorkloadGroup::_memory_debug_string() const {
     auto mem_used_ratio_int = (int64_t)(mem_used_ratio * 100 + 0.5);
     mem_used_ratio = (double)mem_used_ratio_int / 100;
     return fmt::format(
-            "min_memory_percent = {}% , max_memory_percent = {}% , 
memory_limit = {}B, "
+            "min_memory_percent = {}% , max_memory_percent = {}% , 
memory_limit = {}B, " // add a blackspace after % to avoid log4j format bugs
             "slot_memory_policy = {}, total_query_slot_count = {}, "
             "memory_low_watermark = {}, memory_high_watermark = {}, "
-            "enable_write_buffer_limit = {}, write_buffer_ratio = {}% , " // 
add a blackspace after % to avoid log4j format bugs
-            "write_buffer_limit = {}, "
-            "mem_used_ratio = {}, total_mem_used = {}(write_buffer_size = {}, "
+            "mem_used_ratio = {}, total_mem_used = {}, "
             "wg_refresh_interval_memory_growth = {}",
             _min_memory_percent, _max_memory_percent,
             PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
to_string(_slot_mem_policy),
-            _total_query_slot_count, _memory_low_watermark, 
_memory_high_watermark,
-            _enable_write_buffer_limit, _load_buffer_ratio,
-            PrettyPrinter::print(write_buffer_limit(), TUnit::BYTES), 
mem_used_ratio,
+            _total_query_slot_count, _memory_low_watermark, 
_memory_high_watermark, mem_used_ratio,
             PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
-            PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
             PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES));
 }
 
@@ -177,8 +174,12 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& wg_info) {
             _scan_bytes_per_second = wg_info.read_bytes_per_second;
             _remote_scan_bytes_per_second = 
wg_info.remote_read_bytes_per_second;
             _total_query_slot_count = wg_info.total_query_slot_count;
-            _load_buffer_ratio = wg_info.write_buffer_ratio;
             _slot_mem_policy = wg_info.slot_mem_policy;
+            if (_max_memory_percent > 0) {
+                _min_memory_limit = static_cast<int64_t>(
+                        static_cast<double>(_memory_limit * 
_min_memory_percent) /
+                        _max_memory_percent);
+            }
         } else {
             return;
         }
@@ -188,7 +189,6 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& wg_info) {
 // MemtrackerLimiter is not removed during query context release, so that 
should remove it here.
 int64_t WorkloadGroup::refresh_memory_usage() {
     int64_t fragment_used_memory = 0;
-    int64_t write_buffer_size = 0;
     {
         std::shared_lock<std::shared_mutex> r_lock(_mutex);
         for (const auto& pair : _resource_ctxs) {
@@ -198,13 +198,11 @@ int64_t WorkloadGroup::refresh_memory_usage() {
             }
             DCHECK(resource_ctx->memory_context()->mem_tracker() != nullptr);
             fragment_used_memory += 
resource_ctx->memory_context()->current_memory_bytes();
-            write_buffer_size += 
resource_ctx->memory_context()->mem_tracker()->write_buffer_size();
         }
     }
 
-    _total_mem_used = fragment_used_memory + write_buffer_size;
+    _total_mem_used = fragment_used_memory;
     _wg_metrics->update_memory_used_bytes(_total_mem_used);
-    _write_buffer_size = write_buffer_size;
     // reserve memory is recorded in the query mem tracker
     // and _total_mem_used already contains all the current reserve memory.
     // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
@@ -228,6 +226,21 @@ void WorkloadGroup::do_sweep() {
     }
 }
 
+#ifdef BE_TEST
+void WorkloadGroup::clear_cancelled_resource_ctx() {
+    // Clear resource context that is registered during add_resource_ctx
+    std::unique_lock<std::shared_mutex> wlock(_mutex);
+    for (auto iter = _resource_ctxs.begin(); iter != _resource_ctxs.end();) {
+        auto ctx = iter->second.lock();
+        if (ctx != nullptr && ctx->task_controller()->is_cancelled()) {
+            iter = _resource_ctxs.erase(iter);
+        } else {
+            iter++;
+        }
+    }
+}
+#endif
+
 int64_t WorkloadGroup::revoke_memory(int64_t need_free_mem, const std::string& 
revoke_reason,
                                      RuntimeProfile* profile) {
     if (need_free_mem <= 0) {
@@ -447,12 +460,6 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
         total_query_slot_count = tworkload_group_info.total_query_slot_count;
     }
 
-    // 17 load buffer memory limit
-    int write_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE;
-    if (tworkload_group_info.__isset.write_buffer_ratio) {
-        write_buffer_ratio = tworkload_group_info.write_buffer_ratio;
-    }
-
     // 18 slot memory policy
     TWgSlotMemoryPolicy::type slot_mem_policy = TWgSlotMemoryPolicy::NONE;
     if (tworkload_group_info.__isset.slot_memory_policy) {
@@ -476,7 +483,6 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
             .remote_read_bytes_per_second = remote_read_bytes_per_second,
             .total_query_slot_count = total_query_slot_count,
             .slot_mem_policy = slot_mem_policy,
-            .write_buffer_ratio = write_buffer_ratio,
             .pipeline_exec_thread_num = exec_thread_num,
             .max_flush_thread_num = max_flush_thread_num,
             .min_flush_thread_num = min_flush_thread_num};
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 800c92d05c3..2677596b82b 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -61,7 +61,7 @@ class WorkloadGroup : public 
std::enable_shared_from_this<WorkloadGroup> {
     ENABLE_FACTORY_CREATOR(WorkloadGroup);
 
 public:
-    explicit WorkloadGroup(const WorkloadGroupInfo& wg_info);
+    WorkloadGroup(const WorkloadGroupInfo& wg_info);
 
     virtual ~WorkloadGroup();
 
@@ -77,19 +77,14 @@ public:
 
     int64_t total_mem_used() const { return _total_mem_used; }
 
-    int64_t write_buffer_size() const { return _write_buffer_size; }
-
-    void enable_write_buffer_limit(bool enable_limit) { 
_enable_write_buffer_limit = enable_limit; }
-
-    bool enable_write_buffer_limit() const { return 
_enable_write_buffer_limit; }
-
-    bool exceed_write_buffer_limit() const { return _write_buffer_size > 
write_buffer_limit(); }
-
     // make memory snapshots and refresh total memory used at the same time.
     int64_t refresh_memory_usage();
     int64_t memory_used();
 
     void do_sweep();
+#ifdef BE_TEST
+    void clear_cancelled_resource_ctx();
+#endif
 
     int memory_low_watermark() const {
         return _memory_low_watermark.load(std::memory_order_relaxed);
@@ -99,8 +94,6 @@ public:
         return _memory_high_watermark.load(std::memory_order_relaxed);
     }
 
-    void set_weighted_memory_ratio(double ratio);
-
     int total_query_slot_count() const {
         return _total_query_slot_count.load(std::memory_order_relaxed);
     }
@@ -141,6 +134,8 @@ public:
         return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
     }
 
+    int64_t min_memory_limit() const { return _min_memory_limit; }
+
     Status add_resource_ctx(TUniqueId query_id, 
std::shared_ptr<ResourceContext> resource_ctx) {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
         if (_is_shutdown) {
@@ -207,8 +202,6 @@ public:
     friend class WorkloadGroupMetrics;
     friend class WorkloadGroupMgr;
 
-    int64_t write_buffer_limit() const { return _memory_limit * 
_load_buffer_ratio / 100; }
-
     int64_t revoke_memory(int64_t need_free_mem, const std::string& 
revoke_reason,
                           RuntimeProfile* profile);
 
@@ -228,15 +221,12 @@ private:
     int64_t _version;
     std::atomic<int> _min_cpu_percent = 0;
     std::atomic<int> _max_cpu_percent = 100;
-    std::atomic<int64_t> _memory_limit = 1 << 30; // Default to 1GB
+    std::atomic<int64_t> _memory_limit = 1 << 30;     // Default to 1GB
+    std::atomic<int64_t> _min_memory_limit = 1 << 26; // Default to 64MB
     std::atomic<int> _min_memory_percent = 0;
     std::atomic<int> _max_memory_percent = 100;
     std::atomic<int> _memory_low_watermark;
     std::atomic<int> _memory_high_watermark;
-    // For example, load memtable, write to parquet.
-    // If the wg's memory reached high water mark, then the load buffer
-    // will be restricted to this limit.
-    int64_t _load_buffer_ratio = 0;
 
     std::atomic<int> _scan_thread_num;
     std::atomic<int> _max_remote_scan_thread_num;
@@ -246,9 +236,7 @@ private:
     std::atomic<int> _total_query_slot_count = 0;
     std::atomic<TWgSlotMemoryPolicy::type> _slot_mem_policy 
{TWgSlotMemoryPolicy::NONE};
 
-    std::atomic<bool> _enable_write_buffer_limit = false;
     std::atomic_int64_t _total_mem_used = 0; // bytes
-    std::atomic_int64_t _write_buffer_size = 0;
     std::atomic_int64_t _wg_refresh_interval_memory_growth;
     // means workload group is mark dropped
     // new query can not submit
@@ -292,7 +280,6 @@ struct WorkloadGroupInfo {
     const int64_t remote_read_bytes_per_second = -1;
     const int total_query_slot_count = 0;
     const TWgSlotMemoryPolicy::type slot_mem_policy = 
TWgSlotMemoryPolicy::NONE;
-    const int write_buffer_ratio = 0;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
     int cgroup_cpu_hard_limit = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index e4dba22f668..5b2fd2efd6c 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -47,10 +47,9 @@ const static std::string INTERNAL_NORMAL_WG_NAME = "normal";
 const static uint64_t INTERNAL_NORMAL_WG_ID = 1;
 
 PausedQuery::PausedQuery(std::shared_ptr<ResourceContext> resource_ctx, double 
cache_ratio,
-                         bool any_wg_exceed_limit, int64_t reserve_size)
+                         int64_t reserve_size)
         : resource_ctx_(resource_ctx),
           cache_ratio_(cache_ratio),
-          any_wg_exceed_limit_(any_wg_exceed_limit),
           reserve_size_(reserve_size),
           query_id_(print_id(resource_ctx->task_controller()->task_id())) {
     enqueue_at = std::chrono::system_clock::now();
@@ -65,26 +64,8 @@ WorkloadGroupPtr 
WorkloadGroupMgr::get_or_create_workload_group(
     std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
     // 1. update internal wg's id
     if (fe_wg_info.name == INTERNAL_NORMAL_WG_NAME) {
-        WorkloadGroupPtr wg_ptr = nullptr;
-        uint64_t old_wg_id = -1;
-        auto before_wg_size = _workload_groups.size();
-        for (auto& wg_pair : _workload_groups) {
-            uint64_t wg_id = wg_pair.first;
-            WorkloadGroupPtr wg = wg_pair.second;
-            if (INTERNAL_NORMAL_WG_NAME == wg->name() && wg_id != 
fe_wg_info.id) {
-                wg_ptr = wg_pair.second;
-                old_wg_id = wg_id;
-                break;
-            }
-        }
-        if (wg_ptr) {
-            _workload_groups.erase(old_wg_id);
-            wg_ptr->set_id(fe_wg_info.id);
-            _workload_groups[wg_ptr->id()] = wg_ptr;
-            LOG(INFO) << "[topic_publish_wg] normal wg id changed, before: " 
<< old_wg_id
-                      << ", after:" << wg_ptr->id() << ", wg size:" << 
before_wg_size << ", "
-                      << _workload_groups.size();
-        }
+        // normal wg's id maybe not equal to BE's id, so that need update it
+        reset_workload_group_id(INTERNAL_NORMAL_WG_NAME, fe_wg_info.id);
     }
 
     // 2. check and update wg
@@ -94,7 +75,7 @@ WorkloadGroupPtr 
WorkloadGroupMgr::get_or_create_workload_group(
         return workload_group;
     }
 
-    auto new_task_group = std::make_shared<WorkloadGroup>(fe_wg_info);
+    auto new_task_group = WorkloadGroup::create_shared(fe_wg_info);
     _workload_groups[fe_wg_info.id] = new_task_group;
     return new_task_group;
 }
@@ -144,6 +125,27 @@ WorkloadGroupPtr 
WorkloadGroupMgr::get_group(std::vector<uint64_t>& id_list) {
     return ret_wg;
 }
 
+void WorkloadGroupMgr::reset_workload_group_id(std::string 
workload_group_name, uint64_t new_id) {
+    WorkloadGroupPtr wg_ptr = nullptr;
+    uint64_t old_wg_id = -1;
+    for (auto& wg_pair : _workload_groups) {
+        uint64_t wg_id = wg_pair.first;
+        WorkloadGroupPtr wg = wg_pair.second;
+        if (workload_group_name == wg->name() && wg_id != new_id) {
+            wg_ptr = wg_pair.second;
+            old_wg_id = wg_id;
+            break;
+        }
+    }
+    if (wg_ptr) {
+        _workload_groups.erase(old_wg_id);
+        wg_ptr->set_id(new_id);
+        _workload_groups[wg_ptr->id()] = wg_ptr;
+        LOG(INFO) << "workload group's id changed, before: " << old_wg_id
+                  << ", after:" << wg_ptr->id();
+    }
+}
+
 void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> 
used_wg_id) {
     int64_t begin_time = MonotonicMillis();
     // 1 get delete group without running queries
@@ -220,58 +222,24 @@ struct WorkloadGroupMemInfo {
             std::list<std::shared_ptr<MemTrackerLimiter>>();
 };
 
-void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
+void WorkloadGroupMgr::refresh_workload_group_memory_state() {
     std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
 
     // 1. make all workload groups memory snapshots(refresh workload groups 
total memory used at the same time)
     // and calculate total memory used of all queries.
     int64_t all_workload_groups_mem_usage = 0;
-    bool has_wg_exceed_limit = false;
     for (auto& [wg_id, wg] : _workload_groups) {
         all_workload_groups_mem_usage += wg->refresh_memory_usage();
-        if (wg->exceed_limit()) {
-            has_wg_exceed_limit = true;
-        }
     }
-    doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit = 
has_wg_exceed_limit;
     if (all_workload_groups_mem_usage <= 0) {
         return;
     }
 
-    // 2. calculate weighted memory limit ratio.
-    // when construct workload group, mem_limit is equal to 
(process_memory_limit * group_limit_percent),
-    // here, it is assumed that the available memory of workload groups is 
equal to process_memory_limit.
-    //
-    // but process_memory_usage is actually bigger than 
all_workload_groups_mem_usage,
-    // because public_memory of page cache, allocator cache, segment cache 
etc. are included in process_memory_usage.
-    // so actual available memory of the workload groups is equal to 
(process_memory_limit - public_memory)
-    //
-    // we will exclude this public_memory when calculate workload group 
mem_limit.
-    // so a ratio is calculated to multiply the workload group mem_limit from 
the previous construction.
-    auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage();
-    auto process_memory_limit = MemInfo::mem_limit();
-    double weighted_memory_limit_ratio = 1;
-    // if all_workload_groups_mem_usage is greater than process_memory_usage, 
it means that the memory statistics
-    // of the workload group are inaccurate.
-    // the reason is that query/load/etc. tracked is virtual memory, and 
virtual memory is not used in time.
-    //
-    // At this time, weighted_memory_limit_ratio is equal to 1, and workload 
group mem_limit is still equal to
-    // (process_memory_limit * group_limit_percent), this may cause query 
spill to occur earlier,
-    // However, there is no good solution at present, but we cannot predict 
when these virtual memory will be used.
-    if (all_workload_groups_mem_usage < process_memory_usage) {
-        int64_t public_memory = process_memory_usage - 
all_workload_groups_mem_usage;
-        weighted_memory_limit_ratio = 1 - (double)public_memory / 
(double)process_memory_limit;
-        // Round the value from 1% to 100%.
-        weighted_memory_limit_ratio = std::floor(weighted_memory_limit_ratio * 
100) / 100;
-    }
-
-    std::string debug_msg = fmt::format(
-            "\nProcess Memory Summary: {}, {}, all workload groups memory 
usage: {}, "
-            "weighted_memory_limit_ratio: {}",
-            doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
-            doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
-            PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
-            weighted_memory_limit_ratio);
+    std::string debug_msg =
+            fmt::format("\nProcess Memory Summary: {}, {}, all workload groups 
memory usage: {}",
+                        
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
+                        
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
+                        PrettyPrinter::print(all_workload_groups_mem_usage, 
TUnit::BYTES));
     LOG_EVERY_T(INFO, 60) << debug_msg;
     for (auto& wg : _workload_groups) {
         update_queries_limit_(wg.second, false);
@@ -300,7 +268,6 @@ void 
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
                 4, wg->get_metrics()->get_local_scan_bytes_per_second(), 
block);
         SchemaScannerHelper::insert_int64_value(
                 5, wg->get_metrics()->get_remote_scan_bytes_per_second(), 
block);
-        SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), 
block);
     }
 }
 
@@ -322,7 +289,7 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<ResourceContext>&
     auto&& [it, inserted] = _paused_queries_list[wg].emplace(
             resource_ctx,
             
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
-            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+            reserve_size);
     // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
     // if hard limit is enabled, then not need enable other queries hard limit.
     if (inserted) {
@@ -351,7 +318,34 @@ void WorkloadGroupMgr::handle_paused_queries() {
     }
 
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
-    bool has_revoked_from_other_group = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto resource_ctx = query_it->resource_ctx_.lock();
+            // The query is finished during in paused list.
+            if (resource_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            // If there are any tasks that is cancelled and canceled time is 
less than 15 seconds, just break.
+            // because it may release memory and other tasks may not be 
cancelled and spill disk.
+            if (resource_ctx->task_controller()->is_cancelled() &&
+                resource_ctx->task_controller()->cancel_elapsed_millis() <
+                        config::wait_cancel_release_memory_ms) {
+                return;
+            }
+            ++query_it;
+        }
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
+    }
+
     bool has_query_exceed_process_memlimit = false;
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
         auto& queries_list = it->second;
@@ -364,7 +358,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
         }
 
         bool has_changed_hard_limit = false;
-        int64_t flushed_memtable_bytes = 0;
+        bool exceed_low_watermark = false;
+        bool exceed_high_watermark = false;
+        wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
         // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
         // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
         // by weighted value. So if reserve failed, then it is actually exceed 
limit.
@@ -376,12 +372,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 query_it = queries_list.erase(query_it);
                 continue;
             }
-            if (resource_ctx->task_controller()->is_cancelled()) {
-                LOG(INFO) << "Query: " << 
print_id(resource_ctx->task_controller()->task_id())
-                          << " was canceled, remove from paused list";
-                query_it = queries_list.erase(query_it);
-                continue;
-            }
 
             if (resource_ctx->task_controller()
                         ->paused_reason()
@@ -423,7 +413,10 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 if (resource_ctx->memory_context()->adjusted_mem_limit() <
                     resource_ctx->memory_context()->current_memory_bytes() +
                             query_it->reserve_size_) {
-                    
resource_ctx->memory_context()->effect_adjusted_mem_limit();
+                    // The query not exceed the query limit, but exceed the 
expected query limit when the workload
+                    // group memory is not enough, use the litter memory limit 
to let the query exceed query limit.
+                    resource_ctx->memory_context()->set_mem_limit(
+                            
resource_ctx->memory_context()->adjusted_mem_limit());
                     
resource_ctx->task_controller()->set_memory_sufficient(true);
                     LOG(INFO) << "Workload group memory reserve failed because 
"
                               << 
resource_ctx->task_controller()->debug_string() << " reserve size "
@@ -435,15 +428,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     query_it = queries_list.erase(query_it);
                     continue;
                 }
-                if (flushed_memtable_bytes <= 0) {
-                    flushed_memtable_bytes = flush_memtable_from_group_(wg);
-                }
-                if (flushed_memtable_bytes > 0) {
-                    // Flushed some memtable, just wait flush finished and not 
do anything more.
-                    wg->enable_write_buffer_limit(true);
-                    ++query_it;
-                    continue;
-                }
 
                 // when running here, current query adjusted_mem_limit < query 
memory consumption + reserve_size,
                 // which means that the current query itself has not exceeded 
the memory limit.
@@ -498,11 +482,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 } else {
                     // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
                     // and then set wg's flag, other query may not free memory 
very quickly.
-                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                    // If the workload group's memusage is less than low 
watermark then dispatch the query to run.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms ||
+                        !exceed_low_watermark) {
                         // set wg's memory to sufficient, then add it back to 
task scheduler to run.
                         LOG(INFO) << "Query: "
                                   << 
print_id(resource_ctx->task_controller()->task_id())
-                                  << " will be resume.";
+                                  << " has waited in paused query queue for "
+                                  << query_it->elapsed_time() << " ms. Resume 
it.";
                         
resource_ctx->task_controller()->set_memory_sufficient(true);
                         query_it = queries_list.erase(query_it);
                         continue;
@@ -512,9 +499,21 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     }
                 }
             } else {
+                if (revoking_memory_from_other_query_) {
+                    // Previously, we have revoked memory from other query, 
and the cancel stage finished.
+                    // So, resume all queries now.
+                    
resource_ctx->task_controller()->set_memory_sufficient(true);
+                    VLOG_DEBUG << "Query " << 
print_id(resource_ctx->task_controller()->task_id())
+                               << " is blocked due to process memory not 
enough, but already "
+                                  "cancelled some queries, resumt it now.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
                 has_query_exceed_process_memlimit = true;
                 // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
                 // used too much memory. Should clean all cache here.
+                // Clear all cache not part of cache, because the cache thread 
already try to release cache step
+                // by step. And it is not useful.
                 //
                 // here query is paused because of PROCESS_MEMORY_EXCEEDED,
                 // normally, before process memory exceeds, daemon thread 
`refresh_cache_capacity` will
@@ -541,68 +540,48 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
                 // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
                 // execute, this query will never be resumed, and will 
deadlock here.
-                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
-                    config::disable_memory_gc) {
-                    // 1. Check if could revoke some memory from memtable
-                    if (flushed_memtable_bytes <= 0) {
-                        // if the process memory has exceeded the limit, it is 
expected that
-                        // `MemTableMemoryLimiter` will flush most of the 
memtable.
-                        // but if the process memory is not exceeded, and the 
current query expected reserve memory
-                        // to be too large, the other parts of the process 
cannot perceive the reserve memory size,
-                        // so it is expected to flush memtable in 
`handle_paused_queries`.
-                        flushed_memtable_bytes = 
flush_memtable_from_group_(wg);
-                    }
-                    if (flushed_memtable_bytes > 0) {
-                        // Flushed some memtable, just wait flush finished and 
not do anything more.
-                        wg->enable_write_buffer_limit(true);
-                        ++query_it;
-                        continue;
-                    }
-                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
-                    if (!has_revoked_from_other_group) {
-                        // `need_free_mem` is equal to the `reserve_size_` of 
the first query
-                        // that `handle_paused_queries` reaches here this time.
-                        // this means that at least `reserve_size_` memory is 
released from other wgs.
-                        // the released memory at least allows the current 
query to execute,
-                        // but we will wake up all queries after this 
`handle_paused_queries`,
-                        // even if the released memory is not enough for all 
queries to execute,
-                        // but this can simplify the behavior and omit the 
query priority.
-                        int64_t revoked_size = 
revoke_memory_from_other_overcommited_groups_(
-                                resource_ctx, query_it->reserve_size_);
-                        if (revoked_size > 0) {
-                            has_revoked_from_other_group = true;
-                            
resource_ctx->task_controller()->set_memory_sufficient(true);
-                            VLOG_DEBUG << "Query: "
-                                       << 
print_id(resource_ctx->task_controller()->task_id())
-                                       << " is resumed after revoke memory 
from other group.";
-                            query_it = queries_list.erase(query_it);
-                            // Do not care if the revoked_size > reserve size, 
and try to run again.
-                            continue;
+                if (query_it->cache_ratio_ < 0.05 || 
config::disable_memory_gc) {
+                    // If workload group's memory usage > min memory, then it 
means the workload group use too much memory
+                    // in memory contention state. Should just spill
+                    if (wg->total_mem_used() > wg->min_memory_limit()) {
+                        auto revocable_tasks =
+                                
resource_ctx->task_controller()->get_revocable_tasks();
+                        if (revocable_tasks.empty()) {
+                            Status status = Status::MemoryLimitExceeded(
+                                    "Workload group memory usage {} > min 
memory {}, but no "
+                                    "revocable tasks",
+                                    wg->total_mem_used(), 
wg->min_memory_limit());
+                            
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                                    
resource_ctx->task_controller()->task_id(), status);
+                            revoking_memory_from_other_query_ = true;
+                            // If any query is cancelled, then skip others 
because it will release many memory and
+                            // other query may not need release memory.
+                            return;
                         } else {
-                            bool spill_res = handle_single_query_(
-                                    resource_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
-                                    
resource_ctx->task_controller()->paused_reason());
-                            if (spill_res) {
-                                VLOG_DEBUG << "Query: "
-                                           << 
print_id(resource_ctx->task_controller()->task_id())
-                                           << " remove from paused list";
-                                query_it = queries_list.erase(query_it);
-                                continue;
-                            } else {
-                                ++query_it;
-                                continue;
+                            SCOPED_ATTACH_TASK(resource_ctx);
+                            auto status = 
resource_ctx->task_controller()->revoke_memory();
+                            if (!status.ok()) {
+                                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                                        
resource_ctx->task_controller()->task_id(), status);
+                                revoking_memory_from_other_query_ = true;
+                                return;
                             }
+                            query_it = queries_list.erase(query_it);
+                            continue;
                         }
-                    } else {
-                        // If any query is cancelled during process limit 
stage, should resume other query and
-                        // do not do any check now.
-                        
resource_ctx->task_controller()->set_memory_sufficient(true);
-                        VLOG_DEBUG
-                                << "Query: " << 
print_id(resource_ctx->task_controller()->task_id())
-                                << " remove from paused list";
-                        query_it = queries_list.erase(query_it);
-                        continue;
                     }
+
+                    // Other workload groups many use a lot of memory, should 
revoke memory from other workload groups
+                    // by cancelling their queries.
+                    int64_t revoked_size = revoke_memory_from_other_groups_();
+                    if (revoked_size > 0) {
+                        // Revoke memory from other workload groups will 
cancel some queries, wait them cancel finished
+                        // and then check it again.
+                        revoking_memory_from_other_query_ = true;
+                        return;
+                    }
+
+                    // TODO revoke from memtable
                 }
                 // `cache_ratio_ > 0.05` means that the cache has not been 
cleared
                 // when the query enters the paused state.
@@ -625,15 +604,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
 
         // even if wg has no query in the paused state, the following code 
will still be executed
         // because `handle_paused_queries` adds a <wg, empty set> to 
`_paused_queries_list` at the beginning.
-
-        bool is_low_watermark = false;
-        bool is_high_watermark = false;
-        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
-        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
-        if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
-            wg->enable_write_buffer_limit(false);
-        }
-
         if (queries_list.empty()) {
             it = _paused_queries_list.erase(it);
             continue;
@@ -642,6 +612,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
             ++it;
         }
     }
+    // Attention: has to be here. It means, no query is at cancelling state 
and all query blocked by process
+    // not enough has been resumed.
+    revoking_memory_from_other_query_ = false;
 
     if (!has_query_exceed_process_memlimit &&
         
doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted
 < 0.05) {
@@ -656,126 +629,55 @@ void WorkloadGroupMgr::handle_paused_queries() {
     }
 }
 
-// Return the expected free bytes if wg's memtable memory is greater than Max.
-int64_t WorkloadGroupMgr::flush_memtable_from_group_(WorkloadGroupPtr wg) {
-    // If there are a lot of memtable memory, then wait them flush finished.
-    MemTableMemoryLimiter* memtable_limiter =
-            doris::ExecEnv::GetInstance()->memtable_memory_limiter();
-    int64_t memtable_active_bytes = 0;
-    int64_t memtable_queue_bytes = 0;
-    int64_t memtable_flush_bytes = 0;
-    DCHECK(memtable_limiter != nullptr) << "memtable limiter is nullptr";
-    memtable_limiter->get_workload_group_memtable_usage(
-            wg->id(), &memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
-    int64_t max_wg_memtable_bytes = wg->write_buffer_limit();
-    if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
-        max_wg_memtable_bytes) {
-        auto max_wg_active_memtable_bytes = 
(int64_t)(static_cast<double>(max_wg_memtable_bytes) *
-                                                      
config::load_max_wg_active_memtable_percent);
-        // There are many table in flush queue, just waiting them flush 
finished.
-        if (memtable_active_bytes < max_wg_active_memtable_bytes) {
-            LOG_EVERY_T(INFO, 60) << wg->name()
-                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
-                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
-                                  << ", load buffer limit is: " << 
max_wg_memtable_bytes
-                                  << " wait for flush finished to release more 
memory";
-            return memtable_queue_bytes + memtable_flush_bytes;
-        } else {
-            // Flush some memtables(currently written) to flush queue.
-            memtable_limiter->flush_workload_group_memtables(
-                    wg->id(), memtable_active_bytes - 
max_wg_active_memtable_bytes);
-            LOG_EVERY_T(INFO, 60) << wg->name()
-                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
-                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
-                                  << ", flush some active memtable to revoke 
memory";
-            return memtable_queue_bytes + memtable_flush_bytes + 
memtable_active_bytes -
-                   max_wg_active_memtable_bytes;
-        }
-    }
-    return 0;
-}
-
-// Revoke memory from workload group that exceed it's limit. For example, if 
the wg's limit is 10g, but used 12g
-// then should revoke 2g from the group.
-int64_t WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_(
-        std::shared_ptr<ResourceContext> requestor, int64_t need_free_mem) {
-    int64_t freed_mem = 0;
+// Find the workload group that could revoke lot of memory:
+// 1. workload group = max(total used memory - min memory that should reserved 
for it)
+// 2. revoke 10% memory of the workload group that exceeded. For example, if 
the workload group exceed 10g,
+//    then revoke 1g memory.
+// 3. After revoke memory, go to the loop and wait for the query to be 
cancelled and check again.
+int64_t WorkloadGroupMgr::revoke_memory_from_other_groups_() {
     MonotonicStopWatch watch;
     watch.start();
     std::unique_ptr<RuntimeProfile> profile =
-            
std::make_unique<RuntimeProfile>("RevokeMemoryFromOtherOvercommitedGroups");
+            std::make_unique<RuntimeProfile>("RevokeMemoryFromOtherGroups");
 
-    using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
-    auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
-        return left.second < right.second;
-    };
-    std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, 
decltype(cmp)>
-            exceeded_memory_heap(cmp);
-    int64_t total_exceeded_memory = 0;
+    WorkloadGroupPtr max_wg = nullptr;
+    int64_t max_exceeded_memory = 0;
     {
         std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
         for (auto& workload_group : _workload_groups) {
-            // TODO should use min memory percent to check
-            if (!workload_group.second->exceed_limit()) {
+            int64_t min_memory_limit = 
workload_group.second->min_memory_limit();
+            int64_t total_used_memory = 
workload_group.second->total_mem_used();
+            if (total_used_memory <= min_memory_limit) {
+                // min memory is reserved for this workload group, if it used 
less than min memory,
+                // then not revoke memory from it.
                 continue;
             }
-            if (requestor->workload_group() != nullptr &&
-                workload_group.second->id() == 
requestor->workload_group()->id()) {
-                continue;
+            if (total_used_memory - min_memory_limit > max_exceeded_memory) {
+                max_wg = workload_group.second;
+                max_exceeded_memory = total_used_memory - min_memory_limit;
             }
-            auto exceeded_memory =
-                    workload_group.second->memory_used() - 
workload_group.second->memory_limit();
-            exceeded_memory_heap.emplace(workload_group.second, 
exceeded_memory);
-            total_exceeded_memory += exceeded_memory;
         }
     }
-
-    auto revoke_reason = fmt::format(
-            "{} try reserve {} bytes failed, revoke memory from other 
overcommited groups",
-            requestor->memory_context()->mem_tracker()->label(), 
need_free_mem);
+    if (max_wg == nullptr) {
+        return 0;
+    }
+    if (max_exceeded_memory < 1 << 27) {
+        LOG(INFO) << "The workload group that exceed most memory is :"
+                  << max_wg->memory_debug_string() << ", max_exceeded_memory: "
+                  << PrettyPrinter::print(max_exceeded_memory, TUnit::BYTES)
+                  << " less than 128MB, no need to revoke memory";
+        return 0;
+    }
+    int64_t freed_mem = static_cast<int64_t>((double)max_exceeded_memory * 
0.1);
+    // Revoke 10% of memory from the workload group that exceed most memory
+    max_wg->revoke_memory(freed_mem, "exceed_memory", profile.get());
+    std::stringstream ss;
+    profile->pretty_print(&ss);
     LOG(INFO) << fmt::format(
-            "[MemoryGC] start 
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, {}, "
-            "number of overcommited groups: {}, total exceeded memory: {}.",
-            revoke_reason, exceeded_memory_heap.size(),
-            PrettyPrinter::print_bytes(total_exceeded_memory));
-    Defer defer {[&]() {
-        std::stringstream ss;
-        profile->pretty_print(&ss);
-        LOG(INFO) << fmt::format(
-                "[MemoryGC] end 
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, "
-                "{}, number of overcommited groups: {}, free memory {}. 
cost(us): {}, details: {}",
-                revoke_reason, exceeded_memory_heap.size(), 
PrettyPrinter::print_bytes(freed_mem),
-                watch.elapsed_time() / 1000, ss.str());
-    }};
-
-    // 1. check memtable usage, and try to flush them and not wait for 
finished.
-    // TODO, there are two problems with flushing the memtable of other 
overcommited groups:
-    //      1. When should enable_write_buffer_limit be set back to false?
-    //      2. Flushing the memtable may be slow, current query may have to 
wait for a long time.
-    // auto heap_copy = heap;
-    // while (!heap_copy.empty() && need_free_mem - freed_mem > 0 &&
-    //         !requestor->task_controller()->is_cancelled()) {
-    //     auto [wg, sort_mem] = heap_copy.top();
-    //     heap_copy.pop();
-    //     if (wg->exceed_limit() && !wg->enable_write_buffer_limit()) { // is 
overcommited
-    //         int64_t flushed_memtable_bytes = flush_memtable_from_group_(wg);
-    //         if (flushed_memtable_bytes > 0) {
-    //             wg->enable_write_buffer_limit(true);
-    //         }
-    //         freed_mem += flushed_memtable_bytes;
-    //     }
-    // }
-
-    // 2. cancel top usage query in other overcommit group, one by one.
-    // Sort all memory limiter in all overcommit wg, and cancel the top usage 
task that with most memory.
-    // Maybe not valid because it's memory not exceed limit.
-    while (!exceeded_memory_heap.empty() && need_free_mem - freed_mem > 0 &&
-           !requestor->task_controller()->is_cancelled()) {
-        auto [wg, exceeded_memory] = exceeded_memory_heap.top();
-        exceeded_memory_heap.pop();
-        freed_mem += wg->revoke_memory(std::min(exceeded_memory, need_free_mem 
- freed_mem),
-                                       revoke_reason, profile.get());
-    }
+            "[MemoryGC] process memory not enough, revoke memory from 
workload_group: {}, "
+            "free memory {}. cost(us): {}, details: {}",
+            max_wg->memory_debug_string(), 
PrettyPrinter::print_bytes(freed_mem),
+            watch.elapsed_time() / 1000, ss.str());
     return freed_mem;
 }
 
@@ -836,6 +738,10 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<ResourceContex
                                 ->memory_profile()
                                 ->process_memory_detail_str());
                 LOG_LONG_STRING(INFO, log_str);
+                // Disable reserve memory will enable query level memory 
check, if the query
+                // need a lot of memory than the memory limit, it will be 
killed.
+                // Do not need set memlimit = ajusted_mem_limit because 
workload group refresher thread
+                // will update automatically.
                 requestor->task_controller()->disable_reserve_memory();
                 requestor->task_controller()->set_memory_sufficient(true);
                 return true;
@@ -923,40 +829,23 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<ResourceContex
 void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
     auto wg_mem_limit = wg->memory_limit();
     auto all_resource_ctxs = wg->resource_ctxs();
-    bool is_low_watermark = false;
-    bool is_high_watermark = false;
-    wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+    bool exceed_low_watermark = false;
+    bool exceed_high_watermark = false;
+    wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
     int64_t wg_high_water_mark_limit =
             (int64_t)(static_cast<double>(wg_mem_limit) * 
wg->memory_high_watermark() * 1.0 / 100);
-    int64_t memtable_usage = wg->write_buffer_size();
     int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
-    if (memtable_usage > wg->write_buffer_limit()) {
-        wg_high_water_mark_except_load = wg_high_water_mark_limit - 
wg->write_buffer_limit();
-    } else {
-        wg_high_water_mark_except_load =
-                wg_high_water_mark_limit - memtable_usage - 10 * 1024 * 1024;
-    }
     std::string debug_msg;
-    if (is_high_watermark || is_low_watermark) {
+    if (exceed_high_watermark || exceed_low_watermark) {
         debug_msg = fmt::format(
                 "\nWorkload Group {}: mem limit: {}, mem used: {}, "
-                "high water mark mem limit: {}, load memtable usage: {}, used 
ratio: {}",
+                "high water mark mem limit: {}, used ratio: {}",
                 wg->name(), PrettyPrinter::print(wg->memory_limit(), 
TUnit::BYTES),
                 PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES),
                 PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES),
-                PrettyPrinter::print(memtable_usage, TUnit::BYTES),
                 (double)(wg->total_mem_used()) / 
static_cast<double>(wg_mem_limit));
     }
 
-    // If reached low watermark, then enable load buffer limit
-    if (is_low_watermark) {
-        wg->enable_write_buffer_limit(true);
-    }
-    // Both enable overcommit and not enable overcommit, if user set slot 
memory policy
-    // then we will replace the memtracker's memlimit with
-    if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
-        return;
-    }
     int32_t total_used_slot_count = 0;
     int32_t total_slot_count = wg->total_query_slot_count();
     // calculate total used slot count
@@ -977,13 +866,20 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
         if (!resource_ctx) {
             continue;
         }
-        if (is_low_watermark) {
+        if (exceed_low_watermark) {
             resource_ctx->task_controller()->set_low_memory_mode(true);
         }
         int64_t query_weighted_mem_limit = 0;
         int64_t expected_query_weighted_mem_limit = 0;
-        // If the query enable hard limit, then it should not use the soft 
limit
-        if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) {
+        if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+            query_weighted_mem_limit = 
resource_ctx->memory_context()->user_set_mem_limit();
+            // If the policy is NONE, we use the query's memory limit. but the 
query's memory limit
+            // should not be greater than the workload group's memory limit.
+            if (query_weighted_mem_limit > wg_mem_limit) {
+                query_weighted_mem_limit = wg_mem_limit;
+            }
+            expected_query_weighted_mem_limit = query_weighted_mem_limit;
+        } else if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) {
             // TODO, `Policy::FIXED` expects `all_query_used_slot_count < 
wg_total_slot_count`,
             // which is controlled when query is submitted
             // DCEHCK(total_used_slot_count <= total_slot_count);
@@ -991,6 +887,7 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
                 LOG(WARNING)
                         << "Query " << 
print_id(resource_ctx->task_controller()->task_id())
                         << " enabled hard limit, but the slot count < 1, could 
not take affect";
+                continue;
             } else {
                 // If the query enable hard limit, then not use weighted info 
any more, just use the settings limit.
                 query_weighted_mem_limit =
@@ -999,7 +896,7 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
                                   total_slot_count);
                 expected_query_weighted_mem_limit = query_weighted_mem_limit;
             }
-        } else {
+        } else if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DYNAMIC) {
             // If low water mark is not reached, then use process memory limit 
as query memory limit.
             // It means it will not take effect.
             // If there are some query in paused list, then limit should take 
effect.
@@ -1011,7 +908,7 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
                                         
resource_ctx->task_controller()->get_slot_count() * 1.0 /
                                         total_used_slot_count)
                             : wg_high_water_mark_except_load;
-            if (!is_low_watermark && !enable_hard_limit) {
+            if (!exceed_low_watermark && !enable_hard_limit) {
                 query_weighted_mem_limit = wg_high_water_mark_except_load;
             } else {
                 query_weighted_mem_limit = expected_query_weighted_mem_limit;
@@ -1021,7 +918,11 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
         // If the query is a pure load task, then should not modify its limit. 
Or it will reserve
         // memory failed and we did not hanle it.
         if (!resource_ctx->task_controller()->is_pure_load_task()) {
-            
resource_ctx->memory_context()->set_mem_limit(query_weighted_mem_limit);
+            // If user's set mem limit is less than query weighted mem limit, 
then should not modify its limit.
+            // Use user settings.
+            if (resource_ctx->memory_context()->user_set_mem_limit() > 
query_weighted_mem_limit) {
+                
resource_ctx->memory_context()->set_mem_limit(query_weighted_mem_limit);
+            }
             resource_ctx->memory_context()->set_adjusted_mem_limit(
                     expected_query_weighted_mem_limit);
         }
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 87e7943d436..20f8a926148 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -46,11 +46,10 @@ public:
     std::chrono::system_clock::time_point enqueue_at;
     size_t last_mem_usage {0};
     double cache_ratio_ {0.0};
-    bool any_wg_exceed_limit_ {false};
     int64_t reserve_size_ {0};
 
     PausedQuery(std::shared_ptr<ResourceContext> resource_ctx_, double 
cache_ratio,
-                bool any_wg_exceed_limit, int64_t reserve_size);
+                int64_t reserve_size);
 
     int64_t elapsed_time() const {
         auto now = std::chrono::system_clock::now();
@@ -76,11 +75,15 @@ public:
 
     WorkloadGroupPtr get_group(std::vector<uint64_t>& id_list);
 
+    // This method is used during workload group listener to update internal 
workload group's id.
+    // This method does not acquire locks, so it should be called in a locked 
context.
+    void reset_workload_group_id(std::string workload_group_name, uint64_t 
new_id);
+
     void do_sweep();
 
     void stop();
 
-    void refresh_wg_weighted_memory_limit();
+    void refresh_workload_group_memory_state();
 
     void get_wg_resource_usage(vectorized::Block* block);
 
@@ -99,11 +102,9 @@ private:
 
     WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& 
workload_group_info);
 
-    int64_t flush_memtable_from_group_(WorkloadGroupPtr wg);
     bool handle_single_query_(const std::shared_ptr<ResourceContext>& 
requestor,
                               size_t size_to_reserve, int64_t time_in_queue, 
Status paused_reason);
-    int64_t revoke_memory_from_other_overcommited_groups_(
-            std::shared_ptr<ResourceContext> requestor, int64_t need_free_mem);
+    int64_t revoke_memory_from_other_groups_();
     void update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit);
 
     std::shared_mutex _group_mutex;
@@ -115,6 +116,9 @@ private:
     // workload group, because we need do some coordinate work globally.
     std::mutex _paused_queries_lock;
     std::map<WorkloadGroupPtr, std::set<PausedQuery>> _paused_queries_list;
+    // If any query is cancelled when process memory is not enough, we set 
this to true.
+    // When there is not query in cancel state, this var is set to false.
+    bool revoking_memory_from_other_query_ = false;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/memory_context.h 
b/be/src/runtime/workload_management/memory_context.h
index 0e04a624ccc..027f2fcb61d 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -89,13 +89,14 @@ public:
     void set_mem_limit(int64_t new_mem_limit) const { 
mem_tracker_->set_limit(new_mem_limit); }
     int64_t mem_limit() const { return mem_tracker_->limit(); }
 
+    int64_t user_set_mem_limit() const { return user_set_mem_limit_; }
+
     // The new memlimit should be less than user set memlimit.
     void set_adjusted_mem_limit(int64_t new_mem_limit) {
         adjusted_mem_limit_ = std::min<int64_t>(new_mem_limit, 
user_set_mem_limit_);
     }
     // Expected mem limit is the limit when workload group reached limit.
     int64_t adjusted_mem_limit() { return adjusted_mem_limit_; }
-    void effect_adjusted_mem_limit() { set_mem_limit(adjusted_mem_limit_); }
 
     int64_t current_memory_bytes() const { return mem_tracker_->consumption(); 
}
     int64_t peak_memory_bytes() const { return 
mem_tracker_->peak_consumption(); }
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp 
b/be/src/runtime/workload_management/query_task_controller.cpp
index 6f7ac130d3d..43ef520b794 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -24,8 +24,9 @@
 namespace doris {
 #include "common/compile_check_begin.h"
 
-std::unique_ptr<TaskController> QueryTaskController::create(QueryContext* 
query_ctx) {
-    return QueryTaskController::create_unique(query_ctx->shared_from_this());
+std::unique_ptr<TaskController> QueryTaskController::create(
+        std::shared_ptr<QueryContext> query_ctx) {
+    return QueryTaskController::create_unique(query_ctx);
 }
 
 bool QueryTaskController::is_cancelled() const {
diff --git a/be/src/runtime/workload_management/query_task_controller.h 
b/be/src/runtime/workload_management/query_task_controller.h
index 6a1d11076c4..64681177f92 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -29,7 +29,7 @@ class QueryTaskController : public TaskController {
     ENABLE_FACTORY_CREATOR(QueryTaskController);
 
 public:
-    static std::unique_ptr<TaskController> create(QueryContext* query_ctx);
+    static std::unique_ptr<TaskController> 
create(std::shared_ptr<QueryContext> query_ctx);
     ~QueryTaskController() override = default;
 
     bool is_cancelled() const override;
diff --git a/be/src/runtime/workload_management/task_controller.h 
b/be/src/runtime/workload_management/task_controller.h
index 0ecf1d28541..de3a3db6fef 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -77,6 +77,8 @@ public:
         return cancel_impl(reason);
     }
 
+    int64_t cancel_elapsed_millis() const { return MonotonicMillis() - 
cancelled_time_; }
+
     virtual bool cancel_impl(const Status& reason) { return false; }
 
     int64_t cancelled_time() const { return cancelled_time_; }
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index bbaee48ceca..ae2a6ab2a93 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -564,8 +564,8 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
     }
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
-                _state->workload_group(), [state = _state]() { return 
state->is_cancelled(); });
+        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+                [state = _state]() { return state->is_cancelled(); });
         if (_state->is_cancelled()) {
             return _state->cancel_reason();
         }
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index e372f5443ce..bf7b0d5834b 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -169,7 +169,7 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
         ASSERT_TRUE(res.ok());
     }
     static_cast<void>(mem_limiter->init(100));
-    mem_limiter->_handle_memtable_flush(nullptr, nullptr);
+    mem_limiter->handle_memtable_flush(nullptr);
     CHECK_EQ(0, mem_limiter->mem_usage());
 
     res = delta_writer->close();
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp 
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index 8b5e3a8796f..d3c589552f2 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -125,6 +125,8 @@ TEST_F(WorkloadGroupManagerTest, 
get_or_create_workload_group) {
     ASSERT_EQ(wg->id(), 0);
 }
 
+// Query is paused due to query memlimit exceed, after waiting in queue for  
spill_in_paused_queue_timeout_ms
+// it should be resumed
 TEST_F(WorkloadGroupManagerTest, query_exceed) {
     auto wg = _wg_manager->get_or_create_workload_group({});
     auto query_context = _generate_on_query(wg);
@@ -132,19 +134,21 @@ TEST_F(WorkloadGroupManagerTest, query_exceed) {
     query_context->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024);
     query_context->query_mem_tracker()->consume(1024 * 4);
 
+    std::cout << config::spill_in_paused_queue_timeout_ms << std::endl;
+
     _wg_manager->add_paused_query(query_context->resource_ctx(), 1024L * 1024 
* 1024,
                                   
Status::Error(ErrorCode::QUERY_MEMORY_EXCEEDED, "test"));
     {
         std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
         ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
-                << "pasued queue should not be empty";
+                << "paused queue should not be empty";
     }
 
     query_context->query_mem_tracker()->consume(-1024 * 4);
     _run_checking_loop(wg);
 
     std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
-    ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued 
queue should be empty";
+    ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "paused 
queue should be empty";
     ASSERT_EQ(query_context->is_cancelled(), false) << "query should be not 
canceled";
     
ASSERT_EQ(query_context->resource_ctx()->task_controller()->is_enable_reserve_memory(),
 false)
             << "query should disable reserve memory";
@@ -172,7 +176,7 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed1) {
 
     std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
     ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued 
queue should be empty";
-    ASSERT_EQ(query_context->is_cancelled(), false) << "query should be 
canceled";
+    ASSERT_EQ(query_context->is_cancelled(), false) << "query should not be 
canceled";
 }
 
 // TWgSlotMemoryPolicy::NONE
@@ -201,6 +205,8 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed2) {
 
 // TWgSlotMemoryPolicy::NONE
 // query_ctx->workload_group()->exceed_limit() == true
+// query limit > workload group limit
+// query's limit will be set to workload group limit
 TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
     WorkloadGroupInfo wg_info {
             .id = 1, .memory_limit = 1024L * 1024, .slot_mem_policy = 
TWgSlotMemoryPolicy::NONE};
@@ -209,12 +215,15 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
 
     query_context->query_mem_tracker()->consume(1024L * 1024 * 4);
 
+    // adjust memlimit is larger than mem limit
+    
query_context->resource_ctx()->memory_context()->set_adjusted_mem_limit(1024L * 
1024 * 10);
+
     _wg_manager->add_paused_query(query_context->resource_ctx(), 1024L,
                                   
Status::Error(ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED, "test"));
     {
         std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
         ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
-                << "pasued queue should not be empty";
+                << "paused queue should not be empty";
     }
 
     wg->refresh_memory_usage();
@@ -224,10 +233,19 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
 
     // Query was not cancelled, because the query's limit is bigger than the 
wg's limit and the wg's policy is NONE.
     ASSERT_FALSE(query_context->is_cancelled());
-    ASSERT_GT(query_context->resource_ctx()->memory_context()->mem_limit(), 
wg->memory_limit());
+    // Its limit == workload group's limit
+    ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(), 
wg->memory_limit());
 
     std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
-    ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued 
queue should be empty";
+    ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty())
+            << "paused queue should be empty, because the query will be 
resumed";
+    // Query's memory usage + reserve size > adjusted memory size it will be 
resumed
+    // it's memlimit will be set to adjusted size.
+    
ASSERT_EQ(query_context->resource_ctx()->task_controller()->is_enable_reserve_memory(),
 true)
+            << "query should disable reserve memory";
+    // adjust memlimit is larger than workload group memlimit, so adjust 
memlimit is reset to workload group mem limit.
+    
ASSERT_EQ(query_context->resource_ctx()->memory_context()->adjusted_mem_limit(),
+              wg->memory_limit());
 }
 
 // TWgSlotMemoryPolicy::FIXED
@@ -251,7 +269,7 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed4) {
                 << "pasued queue should not be empty";
     }
 
-    _wg_manager->refresh_wg_weighted_memory_limit();
+    _wg_manager->refresh_workload_group_memory_state();
     LOG(INFO) << "***** wg usage " << wg->refresh_memory_usage();
     _run_checking_loop(wg);
 
@@ -260,7 +278,7 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed4) {
     LOG(INFO) << "***** query_context->get_mem_limit(): "
               << query_context->resource_ctx()->memory_context()->mem_limit();
     const auto delta = 
std::abs(query_context->resource_ctx()->memory_context()->mem_limit() -
-                                ((1024L * 1024 * 100 * 95) / 100 - 10 * 1024 * 
1024) / 5);
+                                (1024L * 1024 * 100 * 95) / 100 / 5);
     ASSERT_LE(delta, 1);
 
     std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
@@ -271,6 +289,8 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed4) {
 TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
     WorkloadGroupInfo wg_info {.id = 1,
                                .memory_limit = 1024L * 1024 * 100,
+                               .min_memory_percent = 10,
+                               .max_memory_percent = 100,
                                .memory_low_watermark = 80,
                                .memory_high_watermark = 95,
                                .total_query_slot_count = 5,
@@ -285,10 +305,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
     {
         std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
         ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
-                << "pasued queue should not be empty";
+                << "paused queue should not be empty";
     }
 
-    _wg_manager->refresh_wg_weighted_memory_limit();
+    _wg_manager->refresh_workload_group_memory_state();
     LOG(INFO) << "***** wg usage " << wg->refresh_memory_usage();
     _run_checking_loop(wg);
 
@@ -296,8 +316,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
     
ASSERT_TRUE(query_context->resource_ctx()->task_controller()->paused_reason().ok());
     LOG(INFO) << "***** query_context->get_mem_limit(): "
               << query_context->resource_ctx()->memory_context()->mem_limit();
+
+    // + slot count, because in query memlimit it + slot count
     ASSERT_LE(query_context->resource_ctx()->memory_context()->mem_limit(),
-              (1024L * 1024 * 100 * 95) / 100);
+              ((1024L * 1024 * 100 * 95) / 100 + 5));
 
     std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
     ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued 
queue should be empty";
@@ -383,90 +405,119 @@ TEST_F(WorkloadGroupManagerTest, query_released) {
     ASSERT_TRUE(_wg_manager->_paused_queries_list[wg].empty()) << "pasued 
queue should be empty";
 }
 
-TEST_F(WorkloadGroupManagerTest, overcommit1) {
-    WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 100};
-    WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 100};
-    WorkloadGroupInfo wg3_info {.id = 3, .memory_limit = 1024L * 1024 * 100};
-    WorkloadGroupInfo wg4_info {.id = 4, .memory_limit = 1024L * 1024 * 1024 * 
10};
-    WorkloadGroupInfo wg5_info {.id = 5, .memory_limit = 1024L * 1024 * 1024 * 
100};
+TEST_F(WorkloadGroupManagerTest, ProcessMemoryNotEnough) {
+    WorkloadGroupInfo wg1_info {.id = 1,
+                                .memory_limit = 1024L * 1024 * 1000,
+                                .min_memory_percent = 10,
+                                .max_memory_percent = 100};
+    WorkloadGroupInfo wg2_info {.id = 2,
+                                .memory_limit = 1024L * 1024 * 1000,
+                                .min_memory_percent = 10,
+                                .max_memory_percent = 100};
+    WorkloadGroupInfo wg3_info {.id = 3,
+                                .memory_limit = 1024L * 1024 * 1000,
+                                .min_memory_percent = 10,
+                                .max_memory_percent = 100};
+
     auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info);
     auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);
     auto wg3 = _wg_manager->get_or_create_workload_group(wg3_info);
-    auto wg4 = _wg_manager->get_or_create_workload_group(wg4_info);
-    auto wg5 = _wg_manager->get_or_create_workload_group(wg5_info);
+
     EXPECT_EQ(wg1->id(), wg1_info.id);
     EXPECT_EQ(wg2->id(), wg2_info.id);
     EXPECT_EQ(wg3->id(), wg3_info.id);
-    EXPECT_EQ(wg5->id(), wg5_info.id);
+
+    EXPECT_EQ(1024L * 1024 * 100, wg1->min_memory_limit());
 
     auto query_context11 = _generate_on_query(wg1);
+    query_context11->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
+    query_context11->query_mem_tracker()->consume(1024 * 1024 * 10);
+
+    wg1->refresh_memory_usage();
+    wg2->refresh_memory_usage();
+    wg3->refresh_memory_usage();
+
+    // There is no query in workload groups, so that revoke memory will return 0
+    EXPECT_EQ(0, _wg_manager->revoke_memory_from_other_groups_());
 
-    // wg2 is overcommited, some query is overcommited
+    // If exceed memory less than 128MB, then not revoke
     auto query_context21 = _generate_on_query(wg2);
+    query_context21->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
+    query_context21->query_mem_tracker()->consume(1024 * 1024 * 50);
+    wg2->refresh_memory_usage();
+    EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 50);
+    EXPECT_EQ(wg2->min_memory_limit(), 1024 * 1024 * 100);
+    // There is not workload group's memory usage > it's min memory limit.
+    EXPECT_EQ(0, _wg_manager->revoke_memory_from_other_groups_());
+    ASSERT_FALSE(query_context21->is_cancelled());
+
+    // Add another query that use a lot of memory
     auto query_context22 = _generate_on_query(wg2);
+    query_context22->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
+    query_context22->query_mem_tracker()->consume(1024 * 1024 * 60);
+    wg2->refresh_memory_usage();
+    EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 110);
+    EXPECT_EQ(wg2->min_memory_limit(), 1024 * 1024 * 100);
+    // Could not revoke larger than 128MB, not revoke.
+    EXPECT_EQ(0, _wg_manager->revoke_memory_from_other_groups_());
+    ASSERT_FALSE(query_context21->is_cancelled());
+    ASSERT_FALSE(query_context22->is_cancelled());
+
+    // Add another query that use a lot of memory
     auto query_context23 = _generate_on_query(wg2);
+    query_context23->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
+    query_context23->query_mem_tracker()->consume(1024 * 1024 * 300);
+    wg2->refresh_memory_usage();
+    EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 410);
+    EXPECT_EQ(wg2->min_memory_limit(), 1024 * 1024 * 100);
+    EXPECT_EQ(31 * 1024 * 1024, 
_wg_manager->revoke_memory_from_other_groups_());
+    ASSERT_FALSE(query_context21->is_cancelled());
+    ASSERT_FALSE(query_context22->is_cancelled());
+    ASSERT_TRUE(query_context23->is_cancelled());
+    // Although query23 is cancelled, but it is not removed from workload 
group2, so that it still occupy memory usage.
+    wg2->refresh_memory_usage();
+    EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 410);
+    // clear cancelled query from workload group.
+    wg2->clear_cancelled_resource_ctx();
+    wg2->refresh_memory_usage();
+    EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 110);
+    // todo 应该是cancel 最大的query
+
     auto query_context24 = _generate_on_query(wg2);
-    auto query_context25 = _generate_on_query(wg2);
-    auto query_context26 = _generate_on_query(wg2);
-    query_context21->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024);
-    query_context21->query_mem_tracker()->consume(1024 * 1024 * 1024);
-    query_context22->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
-    query_context22->query_mem_tracker()->consume(1024 * 1024 * 64);
-    query_context23->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024);
-    query_context23->query_mem_tracker()->consume(1024 * 1024 * 10);
-    query_context24->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024);
-    query_context24->query_mem_tracker()->consume(1024);
-    query_context25->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 512);
-    query_context25->query_mem_tracker()->consume(1024 * 1024 * 1024);
-    query_context26->resource_ctx()->memory_context()->set_mem_limit(1024L * 
1024 * 1024 * 100);
-    query_context26->query_mem_tracker()->consume(1024 * 1024 * 1024);
+    query_context24->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
+    query_context24->query_mem_tracker()->consume(1024 * 1024 * 300);
+    wg2->refresh_memory_usage();
+    EXPECT_EQ(wg2->total_mem_used(), 1024 * 1024 * 410); // WG2 exceed 310MB
 
     // wg3 is overcommited, some query is overcommited
     auto query_context31 = _generate_on_query(wg3);
-    auto query_context32 = _generate_on_query(wg3);
-    auto query_context33 = _generate_on_query(wg3);
-    query_context31->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024);
-    query_context31->query_mem_tracker()->consume(1024 * 1024 * 1024);
-    query_context32->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
-    query_context32->query_mem_tracker()->consume(1024 * 1024 * 512);
-    query_context33->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 512);
-    query_context33->query_mem_tracker()->consume(1024 * 1024 * 1024);
-
-    // wg4 not overcommited, query is overcommited
-    auto query_context41 = _generate_on_query(wg4);
-    query_context41->resource_ctx()->memory_context()->set_mem_limit(1024L * 
1024);
-    query_context41->query_mem_tracker()->consume(1024L * 1024 * 1024 * 9);
-
-    // wg5 disable overcommited, query is overcommited
-    auto query_context51 = _generate_on_query(wg5);
-    query_context51->resource_ctx()->memory_context()->set_mem_limit(1024L * 
1024);
-    query_context51->query_mem_tracker()->consume(1024L * 1024 * 1024 * 99);
+    query_context31->resource_ctx()->memory_context()->set_mem_limit(1024 * 
1024 * 1024);
+    query_context31->query_mem_tracker()->consume(1024 * 1024 * 500);
+    wg3->refresh_memory_usage();
+    EXPECT_EQ(wg3->total_mem_used(), 1024 * 1024 * 500); // WG3 exceed 400MB
+
+    EXPECT_EQ(40 * 1024 * 1024, 
_wg_manager->revoke_memory_from_other_groups_());
 
     wg1->refresh_memory_usage();
     wg2->refresh_memory_usage();
     wg3->refresh_memory_usage();
-    wg4->refresh_memory_usage();
-    wg5->refresh_memory_usage();
-
-    // step1, wg2 is overcommited largest, cancel some overcommited query, 
freed memory less than need_free_mem.
-    // step2, wg3 is less overcommited than wg2, cancel some overcommited 
query, terminate after freed memory is greater than need_free_mem.
-    // query41 in wg4 has largest overcommited, but wg4 not overcommited, so 
not cancel query41.
-    EXPECT_EQ(_wg_manager->revoke_memory_from_other_overcommited_groups_(
-                      query_context11->resource_ctx(), 1024L * 1024 * 1024 * 3 
+ 1),
-              1024L * 1024 * 1024 * 4);
 
+    ASSERT_TRUE(query_context31->is_cancelled());
+    // query31 is still in wg3, so that it is not cancel again.
+    EXPECT_EQ(40 * 1024 * 1024, 
_wg_manager->revoke_memory_from_other_groups_());
     ASSERT_FALSE(query_context11->is_cancelled());
-    ASSERT_TRUE(query_context21->is_cancelled());
+    ASSERT_FALSE(query_context21->is_cancelled());
     ASSERT_FALSE(query_context22->is_cancelled());
-    ASSERT_FALSE(query_context23->is_cancelled());
     ASSERT_FALSE(query_context24->is_cancelled());
-    ASSERT_TRUE(query_context25->is_cancelled());
-    ASSERT_TRUE(query_context26->is_cancelled());
     ASSERT_TRUE(query_context31->is_cancelled());
-    ASSERT_FALSE(query_context32->is_cancelled());
-    ASSERT_FALSE(query_context33->is_cancelled());
-    ASSERT_FALSE(query_context41->is_cancelled());
-    ASSERT_FALSE(query_context51->is_cancelled());
+
+    // remove query31 from wg
+    wg3->clear_cancelled_resource_ctx();
+
+    wg1->refresh_memory_usage();
+    wg2->refresh_memory_usage();
+    wg3->refresh_memory_usage();
+    EXPECT_EQ(wg3->total_mem_used(), 0); // WG3 exceed 400MB
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 57a80e18c85..b6f2ef2fbd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -587,7 +587,6 @@ public class SchemaTable extends Table {
                                     .column("CPU_USAGE_PERCENT", 
ScalarType.createType(PrimitiveType.DOUBLE))
                                     .column("LOCAL_SCAN_BYTES_PER_SECOND", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("REMOTE_SCAN_BYTES_PER_SECOND", 
ScalarType.createType(PrimitiveType.BIGINT))
-                                    .column("WRITE_BUFFER_USAGE_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .build())
             )
             .put("file_cache_statistics",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index f21f6c2dfce..f8e756f1fa2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -68,8 +68,6 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String ENABLE_MEMORY_OVERCOMMIT = 
"enable_memory_overcommit";
 
-    public static final String WRITE_BUFFER_RATIO = "write_buffer_ratio";
-
     public static final String MAX_CONCURRENCY = "max_concurrency";
 
     public static final String MAX_QUEUE_SIZE = "max_queue_size";
@@ -107,7 +105,7 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             .add(MAX_MEMORY_PERCENT).add(MIN_MEMORY_PERCENT)
             .add(MAX_CONCURRENCY).add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT)
             
.add(SCAN_THREAD_NUM).add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
-            
.add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK).add(WRITE_BUFFER_RATIO)
+            .add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK)
             
.add(COMPUTE_GROUP).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
             .add(SLOT_MEMORY_POLICY).build();
 
@@ -135,7 +133,6 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND, 
"-1");
     }
 
-    public static final int WRITE_BUFFER_RATIO_DEFAULT_VALUE = 20;
     public static final String SLOT_MEMORY_POLICY_DEFAULT_VALUE = "none";
     public static final HashSet<String> AVAILABLE_SLOT_MEMORY_POLICY_VALUES = 
new HashSet<String>() {{
             add("none");
@@ -168,16 +165,6 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         this.properties = properties;
         this.version = version;
 
-        if (properties.containsKey(WRITE_BUFFER_RATIO)) {
-            String loadBufLimitStr = properties.get(WRITE_BUFFER_RATIO);
-            if (loadBufLimitStr.endsWith("%")) {
-                loadBufLimitStr = loadBufLimitStr.substring(0, 
loadBufLimitStr.length() - 1);
-            }
-            this.properties.put(WRITE_BUFFER_RATIO, loadBufLimitStr);
-        } else {
-            this.properties.put(WRITE_BUFFER_RATIO, 
WRITE_BUFFER_RATIO_DEFAULT_VALUE + "");
-        }
-
         if (properties.containsKey(SLOT_MEMORY_POLICY)) {
             String slotPolicy = properties.get(SLOT_MEMORY_POLICY);
             this.properties.put(SLOT_MEMORY_POLICY, slotPolicy);
@@ -378,25 +365,6 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                     + MIN_MEMORY_PERCENT + " " + minMemPercent);
         }
 
-        if (properties.containsKey(WRITE_BUFFER_RATIO)) {
-            String writeBufSizeStr = properties.get(WRITE_BUFFER_RATIO);
-            String memLimitErr = WRITE_BUFFER_RATIO + " " + writeBufSizeStr
-                    + " requires a positive int number.";
-            if (writeBufSizeStr.endsWith("%")) {
-                writeBufSizeStr = writeBufSizeStr.substring(0, 
writeBufSizeStr.length() - 1);
-            }
-            try {
-                if (Integer.parseInt(writeBufSizeStr) < 0) {
-                    throw new DdlException(memLimitErr);
-                }
-            } catch (NumberFormatException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(memLimitErr, e);
-                }
-                throw new DdlException(memLimitErr);
-            }
-        }
-
         if (properties.containsKey(SLOT_MEMORY_POLICY)) {
             String value = properties.get(SLOT_MEMORY_POLICY).toLowerCase();
             if (!AVAILABLE_SLOT_MEMORY_POLICY_VALUES.contains(value)) {
@@ -741,10 +709,6 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             tWorkloadGroupInfo.setMinMemoryPercent(this.getMinMemoryPercent());
         }
 
-        String writeBufferRatioStr = properties.get(WRITE_BUFFER_RATIO);
-        if (writeBufferRatioStr != null) {
-            
tWorkloadGroupInfo.setWriteBufferRatio(Integer.parseInt(writeBufferRatioStr));
-        }
         String slotMemoryPolicyStr = properties.get(SLOT_MEMORY_POLICY);
         if (slotMemoryPolicyStr != null) {
             
tWorkloadGroupInfo.setSlotMemoryPolicy(findSlotPolicyValueByString(slotMemoryPolicyStr));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 967cb516907..b7cbb082187 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -85,7 +85,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             .add(WorkloadGroup.COMPUTE_GROUP)
             .add(WorkloadGroup.READ_BYTES_PER_SECOND)
             .add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND)
-            .add(WorkloadGroup.WRITE_BUFFER_RATIO)
             .add(WorkloadGroup.SLOT_MEMORY_POLICY)
             .add(QueryQueue.RUNNING_QUERY_NUM)
             .add(QueryQueue.WAITING_QUERY_NUM)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to