This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 643f622b515 make memtable usage counter more accurate (#47017)
643f622b515 is described below
commit 643f622b51564e6204f56f26a5b9b0d2cd7f785a
Author: yiguolei <[email protected]>
AuthorDate: Wed Jan 15 13:40:31 2025 +0800
make memtable usage counter more accurate (#47017)
---
be/src/olap/memtable.cpp | 86 ++++++++++++++++++++++++++----------------------
be/src/olap/memtable.h | 6 ++--
2 files changed, 50 insertions(+), 42 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 8593f1ef482..67b68f978a1 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -61,6 +61,10 @@ MemTable::MemTable(int64_t tablet_id,
std::shared_ptr<TabletSchema> tablet_schem
_total_size_of_aggregate_states(0) {
g_memtable_cnt << 1;
_query_thread_context.init_unlocked();
+ _mem_tracker = std::make_shared<MemTracker>();
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ _query_thread_context.query_mem_tracker->write_tracker());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
_arena = std::make_unique<vectorized::Arena>();
_vec_row_comparator =
std::make_shared<RowInBlockComparator>(_tablet_schema);
_num_columns = _tablet_schema->num_columns();
@@ -77,7 +81,7 @@ MemTable::MemTable(int64_t tablet_id,
std::shared_ptr<TabletSchema> tablet_schem
}
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
- _mem_tracker = std::make_shared<MemTracker>();
+ _row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>();
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
@@ -145,6 +149,34 @@ void MemTable::_init_agg_functions(const
vectorized::Block* block) {
MemTable::~MemTable() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
+ {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ g_memtable_cnt << -1;
+ if (_keys_type != KeysType::DUP_KEYS) {
+ for (auto it = _row_in_blocks->begin(); it !=
_row_in_blocks->end(); it++) {
+ if (!(*it)->has_init_agg()) {
+ continue;
+ }
+ // We should release agg_places here, because they are not
released when a
+ // load is canceled.
+ for (size_t i = _tablet_schema->num_key_columns(); i <
_num_columns; ++i) {
+ auto function = _agg_functions[i];
+ DCHECK(function != nullptr);
+ function->destroy((*it)->agg_places(i));
+ }
+ }
+ }
+ std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(),
+ std::default_delete<RowInBlock>());
+ // Arena has to be destroyed after agg state, because some agg state's
memory may be
+ // allocated in arena.
+ _arena.reset();
+ _vec_row_comparator.reset();
+ _row_in_blocks.reset();
+ _agg_functions.clear();
+ _input_mutable_block.clear();
+ _output_mutable_block.clear();
+ }
if (_is_flush_success) {
// If the memtable is flush success, then its memtracker's consumption
should be 0
if (_mem_tracker->consumption() != 0 &&
config::crash_in_memory_tracker_inaccurate) {
@@ -152,28 +184,6 @@ MemTable::~MemTable() {
<< _mem_tracker->consumption();
}
}
- g_memtable_cnt << -1;
- if (_keys_type != KeysType::DUP_KEYS) {
- for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end();
it++) {
- if (!(*it)->has_init_agg()) {
- continue;
- }
- // We should release agg_places here, because they are not
released when a
- // load is canceled.
- for (size_t i = _tablet_schema->num_key_columns(); i <
_num_columns; ++i) {
- auto function = _agg_functions[i];
- DCHECK(function != nullptr);
- function->destroy((*it)->agg_places(i));
- }
- }
- }
- std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
std::default_delete<RowInBlock>());
- _arena.reset();
- _vec_row_comparator.reset();
- _row_in_blocks.clear();
- _agg_functions.clear();
- _input_mutable_block.clear();
- _output_mutable_block.clear();
}
int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock*
right) const {
@@ -227,7 +237,7 @@ Status MemTable::insert(const vectorized::Block*
input_block,
RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
row_idxs.data() + num_rows,
&_column_offset));
for (int i = 0; i < num_rows; i++) {
- _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock +
i});
+ _row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock +
i});
}
_stat.raw_rows += num_rows;
@@ -285,8 +295,8 @@ Status MemTable::_put_into_output(vectorized::Block&
in_block) {
DorisVector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
- for (int i = 0; i < _row_in_blocks.size(); i++) {
- row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
+ for (int i = 0; i < _row_in_blocks->size(); i++) {
+ row_pos_vec.emplace_back((*_row_in_blocks)[i]->_row_pos);
}
return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() +
in_block.rows());
@@ -297,19 +307,19 @@ size_t MemTable::_sort() {
_stat.sort_times++;
size_t same_keys_num = 0;
// sort new rows
- Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
+ Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size());
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos,
rhs->_row_pos, i, -1);
};
- _sort_one_column(_row_in_blocks, tie, cmp);
+ _sort_one_column(*_row_in_blocks, tie, cmp);
}
bool is_dup = (_keys_type == KeysType::DUP_KEYS);
// sort extra round by _row_pos to make the sort stable
auto iter = tie.iter();
while (iter.next()) {
- pdqsort(std::next(_row_in_blocks.begin(), iter.left()),
- std::next(_row_in_blocks.begin(), iter.right()),
+ pdqsort(std::next(_row_in_blocks->begin(), iter.left()),
+ std::next(_row_in_blocks->begin(), iter.right()),
[&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) ->
bool {
return is_dup ? lhs->_row_pos > rhs->_row_pos :
lhs->_row_pos < rhs->_row_pos;
});
@@ -327,9 +337,9 @@ size_t MemTable::_sort() {
return value < 0;
}
};
- auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
- std::inplace_merge(_row_in_blocks.begin(), new_row_it,
_row_in_blocks.end(), cmp_func);
- _last_sorted_pos = _row_in_blocks.size();
+ auto new_row_it = std::next(_row_in_blocks->begin(), _last_sorted_pos);
+ std::inplace_merge(_row_in_blocks->begin(), new_row_it,
_row_in_blocks->end(), cmp_func);
+ _last_sorted_pos = _row_in_blocks->size();
return same_keys_num;
}
@@ -483,7 +493,7 @@ void MemTable::_aggregate() {
};
if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) {
- for (RowInBlock* cur_row : _row_in_blocks) {
+ for (RowInBlock* cur_row : *_row_in_blocks) {
if (!temp_row_in_blocks.empty() &&
(*_vec_row_comparator)(prev_row, cur_row) == 0) {
if (!prev_row->has_init_agg()) {
init_for_agg(prev_row);
@@ -542,7 +552,7 @@ void MemTable::_aggregate() {
auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>(
mutable_block.mutable_columns()[_skip_bitmap_col_idx].get())
->get_data();
- for (auto* cur_row : _row_in_blocks) {
+ for (auto* cur_row : *_row_in_blocks) {
const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos];
bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id);
// compare keys, the keys of row_with_seq_col and row_with_seq_col
is the same,
@@ -576,8 +586,8 @@ void MemTable::_aggregate() {
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
_output_mutable_block.clear_column_data();
- _row_in_blocks = temp_row_in_blocks;
- _last_sorted_pos = _row_in_blocks.size();
+ *_row_in_blocks = temp_row_in_blocks;
+ _last_sorted_pos = _row_in_blocks->size();
}
}
@@ -648,8 +658,6 @@ Status
MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
RETURN_IF_ERROR(_sort_by_cluster_keys());
}
_input_mutable_block.clear();
- // After to block, all data in arena is saved in the block
- _arena.reset();
*res = vectorized::Block::create_unique(_output_mutable_block.to_block());
return Status::OK();
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index f4b1de45272..5e98bd51a74 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -263,10 +263,10 @@ private:
bool _is_first_insertion;
void _init_agg_functions(const vectorized::Block* block);
- DorisVector<vectorized::AggregateFunctionPtr> _agg_functions;
- DorisVector<size_t> _offsets_of_aggregate_states;
+ std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
+ std::vector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
- DorisVector<RowInBlock*> _row_in_blocks;
+ std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks;
size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]