This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 33fd965b5c [feature-wip](resouce-group) Supports memory soft
isolation of resource group (#19802)
33fd965b5c is described below
commit 33fd965b5cd40cadcb7f8772cb9cf1dba3b22ee5
Author: luozenglin <[email protected]>
AuthorDate: Sun May 21 19:33:57 2023 +0800
[feature-wip](resouce-group) Supports memory soft isolation of resource
group (#19802)
create resource groups name properties(
'enable_memory_overcommit' = 'true' // whether to enable memory soft
isolation
)
---
be/src/common/daemon.cpp | 3 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 14 +---
be/src/runtime/memory/mem_tracker_limiter.h | 3 +-
be/src/runtime/task_group/task_group.cpp | 47 ++++++++----
be/src/runtime/task_group/task_group.h | 22 +++++-
be/src/runtime/task_group/task_group_manager.cpp | 18 ++---
be/src/runtime/task_group/task_group_manager.h | 3 +-
be/src/util/mem_info.cpp | 84 ++++++++++++++++++++--
be/src/util/mem_info.h | 4 ++
.../resource/resourcegroup/ResourceGroup.java | 14 +++-
.../resource/resourcegroup/ResourceGroupMgr.java | 3 +-
11 files changed, 167 insertions(+), 48 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 184d74035f..82c7309766 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -230,7 +230,8 @@ void Daemon::memory_gc_thread() {
auto sys_mem_available = doris::MemInfo::sys_mem_available();
auto proc_mem_no_allocator_cache =
doris::MemInfo::proc_mem_no_allocator_cache();
- auto tg_free_mem =
taskgroup::TaskGroupManager::instance()->memory_limit_gc();
+ // GC excess memory for resource groups that not enable overcommit
+ auto tg_free_mem = doris::MemInfo::tg_hard_memory_limit_gc();
sys_mem_available += tg_free_mem;
proc_mem_no_allocator_cache -= tg_free_mem;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 495ece7981..fa99a31cd5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -487,21 +487,13 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
}
int64_t MemTrackerLimiter::tg_memory_limit_gc(
- uint64_t id, const std::string& name, int64_t memory_limit,
+ int64_t need_free_mem, int64_t used_memory, uint64_t id, const
std::string& name,
+ int64_t memory_limit,
std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups)
{
- int64_t used_memory = 0;
- for (auto& mem_tracker_group : tracker_limiter_groups) {
- std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
- for (const auto& tracker : mem_tracker_group.trackers) {
- used_memory += tracker->consumption();
- }
- }
-
- if (used_memory <= memory_limit) {
+ if (need_free_mem <= 0) {
return 0;
}
- int64_t need_free_mem = used_memory - memory_limit;
int64_t freed_mem = 0;
constexpr auto query_type = MemTrackerLimiter::Type::QUERY;
auto cancel_str = [id, &name, memory_limit, used_memory](int64_t
mem_consumption,
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 0811a898d9..7dd39c62f4 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -200,7 +200,8 @@ public:
}
static int64_t tg_memory_limit_gc(
- uint64_t id, const std::string& name, int64_t memory_limit,
+ int64_t request_free_memory, int64_t used_memory, uint64_t id,
const std::string& name,
+ int64_t memory_limit,
std::vector<taskgroup::TgTrackerLimiterGroup>&
tracker_limiter_groups);
// only for Type::QUERY or Type::LOAD.
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 16a6356f30..ea4fe90632 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -30,7 +30,6 @@
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
-#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
@@ -39,6 +38,7 @@ namespace taskgroup {
const static std::string CPU_SHARE = "cpu_share";
const static std::string MEMORY_LIMIT = "memory_limit";
+const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
pipeline::PipelineTask* TaskGroupEntity::take() {
if (_queue.empty()) {
@@ -81,15 +81,18 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_name(tg_info.name),
_cpu_share(tg_info.cpu_share),
_memory_limit(tg_info.memory_limit),
+ _enable_memory_overcommit(tg_info.enable_memory_overcommit),
_version(tg_info.version),
_task_entity(this),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
std::string TaskGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
- return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit =
{}, version = {}]",
- _id, _name, cpu_share(),
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
- _version);
+ return fmt::format(
+ "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {},
enable_memory_overcommit = "
+ "{}, version = {}]",
+ _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit,
TUnit::BYTES),
+ _enable_memory_overcommit ? "true" : "false", _version);
}
void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
@@ -108,6 +111,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo&
tg_info) {
_name = tg_info.name;
_version = tg_info.version;
_memory_limit = tg_info.memory_limit;
+ _enable_memory_overcommit = tg_info.enable_memory_overcommit;
if (_cpu_share != tg_info.cpu_share) {
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
tg_info, shared_from_this());
@@ -119,6 +123,17 @@ void TaskGroup::update_cpu_share_unlock(const
TaskGroupInfo& tg_info) {
_cpu_share = tg_info.cpu_share;
}
+int64_t TaskGroup::memory_used() {
+ int64_t used_memory = 0;
+ for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
+ std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+ for (const auto& tracker : mem_tracker_group.trackers) {
+ used_memory += tracker->consumption();
+ }
+ }
+ return used_memory;
+}
+
void TaskGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
@@ -131,16 +146,14 @@ void
TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> me
_mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr);
}
-int64_t TaskGroup::memory_limit_gc() {
- std::string name;
- int64_t memory_limit;
- {
- std::shared_lock<std::shared_mutex> rl {_mutex};
- name = _name;
- memory_limit = _memory_limit;
- }
- return MemTrackerLimiter::tg_memory_limit_gc(_id, name, memory_limit,
- _mem_tracker_limiter_pool);
+void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ tg_info->id = _id;
+ tg_info->name = _name;
+ tg_info->cpu_share = _cpu_share;
+ tg_info->memory_limit = _memory_limit;
+ tg_info->enable_memory_overcommit = _enable_memory_overcommit;
+ tg_info->version = _version;
}
Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup&
resource_group,
@@ -174,6 +187,12 @@ Status TaskGroupInfo::parse_group_info(const
TPipelineResourceGroup& resource_gr
return Status::InternalError(ss.str());
}
task_group_info->memory_limit = mem_limit;
+
+ auto enable_memory_overcommit_iter =
resource_group.properties.find(ENABLE_MEMORY_OVERCOMMIT);
+ task_group_info->enable_memory_overcommit =
+ enable_memory_overcommit_iter != resource_group.properties.end() &&
+ enable_memory_overcommit_iter->second ==
+ "true" /* fe guarantees it is 'true' or 'false' */;
return Status::OK();
}
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index a7854158d2..4fe33799b9 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -87,6 +87,18 @@ public:
uint64_t id() const { return _id; }
+ bool enable_memory_overcommit() const {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ return _enable_memory_overcommit;
+ };
+
+ bool memory_limit() const {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ return _memory_limit;
+ };
+
+ int64_t memory_used();
+
std::string debug_string() const;
void check_and_update(const TaskGroupInfo& tg_info);
@@ -97,7 +109,11 @@ public:
void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
- int64_t memory_limit_gc();
+ void task_group_info(TaskGroupInfo* tg_info) const;
+
+ std::vector<TgTrackerLimiterGroup>& mem_tracker_limiter_pool() {
+ return _mem_tracker_limiter_pool;
+ }
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
@@ -105,6 +121,7 @@ private:
std::string _name;
std::atomic<uint64_t> _cpu_share;
int64_t _memory_limit; // bytes
+ bool _enable_memory_overcommit;
int64_t _version;
TaskGroupEntity _task_entity;
@@ -117,8 +134,9 @@ struct TaskGroupInfo {
uint64_t id;
std::string name;
uint64_t cpu_share;
- int64_t version;
int64_t memory_limit;
+ bool enable_memory_overcommit;
+ int64_t version;
static Status parse_group_info(const TPipelineResourceGroup&
resource_group,
TaskGroupInfo* task_group_info);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index c741a0bffd..552ab2c0a9 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -54,20 +54,14 @@ TaskGroupPtr
TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& tas
return new_task_group;
}
-int64_t TaskGroupManager::memory_limit_gc() {
- int64_t total_free_memory = 0;
- std::vector<TaskGroupPtr> task_groups;
- {
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- task_groups.reserve(_task_groups.size());
- for (const auto& [id, task_group] : _task_groups) {
- task_groups.push_back(task_group);
+void TaskGroupManager::get_resource_groups(const std::function<bool(const
TaskGroupPtr& ptr)>& pred,
+ std::vector<TaskGroupPtr>*
task_groups) {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (const auto& [id, task_group] : _task_groups) {
+ if (pred(task_group)) {
+ task_groups->push_back(task_group);
}
}
- for (const auto& task_group : task_groups) {
- total_free_memory += task_group->memory_limit_gc();
- }
- return total_free_memory;
}
} // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index c053a3a45d..0b7472438c 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -33,7 +33,8 @@ public:
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo&
task_group_info);
- int64_t memory_limit_gc();
+ void get_resource_groups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
+ std::vector<TaskGroupPtr>* task_groups);
private:
std::shared_mutex _group_mutex;
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 3a145f8246..c281832ed5 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -43,6 +43,8 @@
#include "olap/segment_loader.h"
#include "runtime/memory/chunk_allocator.h"
#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/task_group/task_group.h"
+#include "runtime/task_group/task_group_manager.h"
#include "util/cgroup_util.h"
#include "util/defer_op.h"
#include "util/parse_util.h"
@@ -132,7 +134,8 @@ void MemInfo::process_cache_gc(int64_t& freed_mem) {
}
// step1: free all cache
-// step2: free top overcommit query, if enable query memroy overcommit
+// step2: free resource groups memory that enable overcommit
+// step3: free global top overcommit query, if enable query memroy overcommit
// TODO Now, the meaning is different from java minor gc + full gc, more like
small gc + large gc.
bool MemInfo::process_minor_gc() {
MonotonicStopWatch watch;
@@ -154,6 +157,11 @@ bool MemInfo::process_minor_gc() {
// TODO add freed_mem
SegmentLoader::instance()->prune();
+ freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem);
+ if (freed_mem > _s_process_minor_gc_size) {
+ return true;
+ }
+
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"Before free top memory overcommit query in Minor GC",
MemTrackerLimiter::Type::QUERY);
if (config::enable_query_memroy_overcommit) {
@@ -167,9 +175,10 @@ bool MemInfo::process_minor_gc() {
}
// step1: free all cache
-// step2: free top memory query
-// step3: free top overcommit load, load retries are more expensive, So cancel
at the end.
-// step4: free top memory load
+// step2: free resource groups memory that enable overcommit
+// step3: free global top memory query
+// step4: free top overcommit load, load retries are more expensive, So cancel
at the end.
+// step5: free top memory load
bool MemInfo::process_full_gc() {
MonotonicStopWatch watch;
watch.start();
@@ -197,6 +206,11 @@ bool MemInfo::process_full_gc() {
}
}
+ freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem);
+ if (freed_mem > _s_process_full_gc_size) {
+ return true;
+ }
+
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top
memory query in Full GC",
MemTrackerLimiter::Type::QUERY);
freed_mem +=
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
@@ -225,6 +239,68 @@ bool MemInfo::process_full_gc() {
return false;
}
+int64_t MemInfo::tg_hard_memory_limit_gc() {
+ std::vector<taskgroup::TaskGroupPtr> task_groups;
+ taskgroup::TaskGroupManager::instance()->get_resource_groups(
+ [](const taskgroup::TaskGroupPtr& task_group) {
+ return !task_group->enable_memory_overcommit();
+ },
+ &task_groups);
+
+ int64_t total_free_memory = 0;
+ for (const auto& task_group : task_groups) {
+ taskgroup::TaskGroupInfo tg_info;
+ task_group->task_group_info(&tg_info);
+ auto used = task_group->memory_used();
+ total_free_memory += MemTrackerLimiter::tg_memory_limit_gc(
+ used - tg_info.memory_limit, used, tg_info.id, tg_info.name,
tg_info.memory_limit,
+ task_group->mem_tracker_limiter_pool());
+ }
+ return total_free_memory;
+}
+
+int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory) {
+ std::vector<taskgroup::TaskGroupPtr> task_groups;
+ taskgroup::TaskGroupManager::instance()->get_resource_groups(
+ [](const taskgroup::TaskGroupPtr& task_group) {
+ return task_group->enable_memory_overcommit();
+ },
+ &task_groups);
+
+ int64_t total_exceeded_memory = 0;
+ std::vector<int64_t> used_memorys;
+ std::vector<int64_t> exceeded_memorys;
+ for (const auto& task_group : task_groups) {
+ auto used_memory = task_group->memory_used();
+ auto exceeded = used_memory - task_group->memory_limit();
+ auto exceeded_memory = exceeded > 0 ? exceeded : 0;
+ total_exceeded_memory += exceeded_memory;
+ used_memorys.emplace_back(used_memory);
+ exceeded_memorys.emplace_back(exceeded_memory);
+ }
+
+ int64_t total_free_memory = 0;
+ bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
+ for (int i = 0; i < task_groups.size(); ++i) {
+ if (exceeded_memorys[i] == 0) {
+ continue;
+ }
+
+ // todo: GC according to resource group priority
+ int64_t tg_need_free_memory =
+ gc_all_exceeded ? exceeded_memorys[i]
+ : static_cast<double>(exceeded_memorys[i]) /
total_exceeded_memory *
+ request_free_memory /* exceeded
memory as a weight */;
+ auto task_group = task_groups[i];
+ taskgroup::TaskGroupInfo tg_info;
+ task_group->task_group_info(&tg_info);
+ total_free_memory += MemTrackerLimiter::tg_memory_limit_gc(
+ tg_need_free_memory, used_memorys[i], tg_info.id, tg_info.name,
+ tg_info.memory_limit, task_group->mem_tracker_limiter_pool());
+ }
+ return total_free_memory;
+}
+
#ifndef __APPLE__
void MemInfo::refresh_proc_meminfo() {
std::ifstream meminfo("/proc/meminfo", std::ios::in);
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index b25e194916..77aa73421f 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -123,6 +123,10 @@ public:
static bool process_minor_gc();
static bool process_full_gc();
+ static int64_t tg_hard_memory_limit_gc();
+
+ static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory);
+
// It is only used after the memory limit is exceeded. When multiple
threads are waiting for the available memory of the process,
// avoid multiple threads starting at the same time and causing OOM.
static std::atomic<int64_t> refresh_interval_memory_growth;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
index b859b95752..6d34cb7595 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
@@ -46,11 +46,13 @@ public class ResourceGroup implements Writable {
public static final String MEMORY_LIMIT = "memory_limit";
+ public static final String ENABLE_MEMORY_OVERCOMMIT =
"enable_memory_overcommit";
+
private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new
ImmutableSet.Builder<String>().add(
CPU_SHARE).add(MEMORY_LIMIT).build();
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new
ImmutableSet.Builder<String>().add(
- CPU_SHARE).add(MEMORY_LIMIT).build();
+ CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).build();
@SerializedName(value = "id")
private long id;
@@ -78,6 +80,9 @@ public class ResourceGroup implements Writable {
this.version = version;
String memoryLimitString = properties.get(MEMORY_LIMIT);
this.memoryLimitPercent =
Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() -
1));
+ if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
+ properties.put(ENABLE_MEMORY_OVERCOMMIT,
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
+ }
}
public static ResourceGroup create(String name, Map<String, String>
properties) throws DdlException {
@@ -129,6 +134,13 @@ public class ResourceGroup implements Writable {
LOG.debug(memLimitErr, e);
throw new DdlException(memLimitErr);
}
+
+ if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
+ String value =
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase();
+ if (!("true".equals(value) || "false".equals(value))) {
+ throw new DdlException("The value of '" +
ENABLE_MEMORY_OVERCOMMIT + "' must be true or false.");
+ }
+ }
}
public long getId() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
index d4d22790d7..f83a9a1678 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
@@ -123,7 +123,8 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
}
Map<String, String> properties = Maps.newHashMap();
properties.put(ResourceGroup.CPU_SHARE, "10");
- properties.put(ResourceGroup.MEMORY_LIMIT, "100%");
+ properties.put(ResourceGroup.MEMORY_LIMIT, "30%");
+ properties.put(ResourceGroup.ENABLE_MEMORY_OVERCOMMIT, "true");
defaultResourceGroup = ResourceGroup.create(DEFAULT_GROUP_NAME,
properties);
nameToResourceGroup.put(DEFAULT_GROUP_NAME, defaultResourceGroup);
idToResourceGroup.put(defaultResourceGroup.getId(),
defaultResourceGroup);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]