This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2c9bdd64fa [fix](memory) arena support memory reuse after clear()
(#21033)
2c9bdd64fa is described below
commit 2c9bdd64fa40c2e81b8092fb7bfeb9a193f9d1b8
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Jun 21 23:27:21 2023 +0800
[fix](memory) arena support memory reuse after clear() (#21033)
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 3 +++
be/src/vec/columns/predicate_column.h | 2 +-
be/src/vec/common/arena.h | 23 ++++++++++++++++++++++
.../vec/exec/join/process_hash_table_probe_impl.h | 2 +-
be/src/vec/exec/join/vhash_join_node.h | 4 +++-
be/src/vec/exec/vaggregation_node.h | 17 ++++++++--------
be/src/vec/exec/vpartition_sort_node.h | 14 +++++++------
8 files changed, 50 insertions(+), 17 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c27fbad769..2ea50568fc 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -129,6 +129,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");
// The maximum time a thread waits for a full GC. Currently only query will
wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
+DEFINE_mInt64(pre_serialize_keys_limit_bytes, "16777216");
+
// the port heartbeat service used
DEFINE_Int32(heartbeat_service_port, "9050");
// the count of heart beat service
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 983c525df0..0cd4e86302 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -169,6 +169,9 @@ DECLARE_mBool(enable_query_memory_overcommit);
// The maximum time a thread waits for a full GC. Currently only query will
wait for full gc.
DECLARE_mInt32(thread_wait_gc_max_milliseconds);
+// reach mem limit, don't serialize in batch
+DECLARE_mInt64(pre_serialize_keys_limit_bytes);
+
// the port heartbeat service used
DECLARE_Int32(heartbeat_service_port);
// the count of heart beat service
diff --git a/be/src/vec/columns/predicate_column.h
b/be/src/vec/columns/predicate_column.h
index 28cb16e949..2cbf871804 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -343,7 +343,7 @@ public:
void clear() override {
data.clear();
if (_arena != nullptr) {
- _arena.reset(new Arena());
+ _arena->clear();
}
}
diff --git a/be/src/vec/common/arena.h b/be/src/vec/common/arena.h
index d98f4eeaa7..9fad70d845 100644
--- a/be/src/vec/common/arena.h
+++ b/be/src/vec/common/arena.h
@@ -277,6 +277,29 @@ public:
return res;
}
+ /**
+ * Delete all the chunks before the head, usually the head is the largest
chunk in the arena.
+ * considering the scenario of memory reuse:
+ * 1. first time, use arena alloc 64K memory, 4K each time, at this time,
there are 4 chunks of 4k 8k 16k 32k in arena.
+ * 2. then, clear arena, only one 32k chunk left in the arena.
+ * 3. second time, same alloc 64K memory, there are 4 chunks of 4k 8k 16k
32k in arena.
+ * 4. then, clear arena, only one 64k chunk left in the arena.
+ * 5. third time, same alloc 64K memory, there is still only one 64K chunk
in the arena, and the memory is fully reused.
+ *
+ * special case: if the chunk is larger than 128M, it will no longer be
expanded by a multiple of 2.
+ * If alloc 4G memory, 128M each time, then only one 128M chunk will be
reserved after clearing,
+ * and only 128M can be reused when you apply for 4G memory again.
+ */
+ void clear() {
+ if (head->prev) {
+ delete head->prev;
+ head->prev = nullptr;
+ }
+ head->pos = head->begin;
+ size_in_bytes = head->size();
+ _used_size_no_head = 0;
+ }
+
/// Size of chunks in bytes.
size_t size() const { return size_in_bytes; }
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index e595796bb5..5923dbf1c5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -181,7 +181,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if (_arena) {
old_probe_keys_memory_usage = _arena->size();
}
- _arena.reset(new Arena());
+ _arena.reset(new Arena()); // TODO arena reuse by clear()?
if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
if (_probe_keys.size() < probe_rows) {
_probe_keys.resize(probe_rows);
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 02927f9724..671d0d4170 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -72,6 +72,8 @@ struct SerializedHashTableContext {
ColumnsHashing::HashMethodSerialized<typename
HashTable::value_type, Mapped, true>;
using Iter = typename HashTable::iterator;
+ SerializedHashTableContext() { _arena.reset(new Arena()); }
+
HashTable hash_table;
Iter iter;
bool inited = false;
@@ -83,7 +85,7 @@ struct SerializedHashTableContext {
keys.resize(num_rows);
}
- _arena.reset(new Arena());
+ _arena->clear();
keys_memory_usage = 0;
size_t keys_size = key_columns.size();
for (size_t i = 0; i < num_rows; ++i) {
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 8b3bb0e82d..ce7070b817 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -102,7 +102,10 @@ struct AggregationMethodSerialized {
std::vector<StringRef> keys;
size_t keys_memory_usage = 0;
AggregationMethodSerialized()
- : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr)
{}
+ : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) {
+ _arena.reset(new Arena());
+ _serialize_key_arena.reset(new Arena());
+ }
using State = ColumnsHashing::HashMethodSerialized<typename
Data::value_type, Mapped, true>;
@@ -120,20 +123,19 @@ struct AggregationMethodSerialized {
}
size_t total_bytes = max_one_row_byte_size * num_rows;
- if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) {
+ if (total_bytes > config::pre_serialize_keys_limit_bytes) {
// reach mem limit, don't serialize in batch
- // for simplicity, we just create a new arena here.
- _arena.reset(new Arena());
+ _arena->clear();
size_t keys_size = key_columns.size();
for (size_t i = 0; i < num_rows; ++i) {
keys[i] = serialize_keys_to_pool_contiguous(i, keys_size,
key_columns, *_arena);
}
keys_memory_usage = _arena->size();
} else {
- _arena.reset();
+ _arena->clear();
if (total_bytes > _serialized_key_buffer_size) {
_serialized_key_buffer_size = total_bytes;
- _serialize_key_arena.reset(new Arena());
+ _serialize_key_arena->clear();
_serialized_key_buffer = reinterpret_cast<uint8_t*>(
_serialize_key_arena->alloc(_serialized_key_buffer_size));
}
@@ -175,7 +177,7 @@ struct AggregationMethodSerialized {
}
void reset() {
- _arena.reset();
+ _arena.reset(new Arena());
keys_memory_usage = 0;
_serialized_key_buffer_size = 0;
}
@@ -185,7 +187,6 @@ private:
uint8_t* _serialized_key_buffer;
std::unique_ptr<Arena> _serialize_key_arena;
std::unique_ptr<Arena> _arena;
- static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 *
1024; // 16M
};
using AggregatedDataWithoutKey = AggregateDataPtr;
diff --git a/be/src/vec/exec/vpartition_sort_node.h
b/be/src/vec/exec/vpartition_sort_node.h
index 4aae4a7acb..45b44da81e 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -102,7 +102,10 @@ struct PartitionMethodSerialized {
using State = ColumnsHashing::HashMethodSerialized<typename
Data::value_type, Mapped, true>;
template <typename Other>
- explicit PartitionMethodSerialized(const Other& other) : data(other.data)
{}
+ explicit PartitionMethodSerialized(const Other& other) : data(other.data) {
+ _arena.reset(new Arena());
+ _serialize_key_arena.reset(new Arena());
+ }
size_t serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) {
if (keys.size() < num_rows) {
@@ -115,20 +118,20 @@ struct PartitionMethodSerialized {
}
size_t total_bytes = max_one_row_byte_size * num_rows;
- if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) {
+ if (total_bytes > config::pre_serialize_keys_limit_bytes) {
// reach mem limit, don't serialize in batch
// for simplicity, we just create a new arena here.
- _arena.reset(new Arena());
+ _arena->clear();
size_t keys_size = key_columns.size();
for (size_t i = 0; i < num_rows; ++i) {
keys[i] = serialize_keys_to_pool_contiguous(i, keys_size,
key_columns, *_arena);
}
keys_memory_usage = _arena->size();
} else {
- _arena.reset();
+ _arena->clear();
if (total_bytes > _serialized_key_buffer_size) {
_serialized_key_buffer_size = total_bytes;
- _serialize_key_arena.reset(new Arena());
+ _serialize_key_arena->clear();
_serialized_key_buffer = reinterpret_cast<uint8_t*>(
_serialize_key_arena->alloc(_serialized_key_buffer_size));
}
@@ -152,7 +155,6 @@ private:
uint8_t* _serialized_key_buffer;
std::unique_ptr<Arena> _serialize_key_arena;
std::unique_ptr<Arena> _arena;
- static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 *
1024; // 16M
};
//for string
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]