kaijchen commented on code in PR #19099:
URL: https://github.com/apache/doris/pull/19099#discussion_r1200211199
##########
be/src/olap/memtable.cpp:
##########
@@ -262,108 +228,168 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock*
new_row, RowInBlock* row_
}
// dst is non-sequence row, or dst sequence is smaller
for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid)
{
- auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
+ auto col_ptr = mutable_block.mutable_columns()[cid].get();
_agg_functions[cid]->add(row_in_skiplist->agg_places(cid),
const_cast<const
doris::vectorized::IColumn**>(&col_ptr),
new_row->_row_pos, nullptr);
}
}
-template <bool is_final>
-void MemTable::_collect_vskiplist_results() {
- if (_keys_type == KeysType::DUP_KEYS) {
- if (_schema->num_key_columns() > 0) {
- _collect_dup_table_with_keys();
- } else {
- // skip sort if the table is dup table without keys
- _collect_dup_table_without_keys();
- }
- } else {
- VecTable::Iterator it(_vec_skip_list.get());
- vectorized::Block in_block = _input_mutable_block.to_block();
- size_t idx = 0;
- for (it.SeekToFirst(); it.Valid(); it.Next()) {
- auto& block_data = in_block.get_columns_with_type_and_name();
- // move key columns
- for (size_t i = 0; i < _schema->num_key_columns(); ++i) {
- _output_mutable_block.get_column_by_position(i)->insert_from(
- *block_data[i].column.get(), it.key()->_row_pos);
- }
- // get value columns from agg_places
- for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i)
{
- auto function = _agg_functions[i];
- auto agg_place = it.key()->agg_places(i);
- auto col_ptr =
_output_mutable_block.get_column_by_position(i).get();
- function->insert_result_into(agg_place, *col_ptr);
- if constexpr (is_final) {
- function->destroy(agg_place);
+void MemTable::prepare_block_for_flush(vectorized::Block& in_block) {
+ if (_keys_type == KeysType::DUP_KEYS && _schema->num_key_columns() == 0) {
+ // skip sort if the table is dup table without keys
+ _output_mutable_block.swap(_input_mutable_block);
+ return;
+ }
+ std::vector<int> row_pos_vec;
+ DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
+ row_pos_vec.reserve(in_block.rows());
+ for (int i = 0; i < _row_in_blocks.size(); i++) {
+ row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
+ }
+ _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
+ row_pos_vec.data() + in_block.rows());
+}
+int MemTable::_sort() {
+ _vec_row_comparator->set_block(&_input_mutable_block);
+ auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
+ size_t same_keys_num = 0;
+ bool is_dup = (_keys_type == KeysType::DUP_KEYS);
+ // sort new rows
+ std::sort(new_row_it, _row_in_blocks.end(),
+ [this, is_dup, &same_keys_num](const RowInBlock* l, const
RowInBlock* r) -> bool {
+ auto value = (*(this->_vec_row_comparator))(l, r);
+ if (value == 0) {
+ same_keys_num++;
+ return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos
< r->_row_pos;
+ } else {
+ return value < 0;
+ }
+ });
Review Comment:
Save this lambda or use a named function to avoid repeating it twice?
Just for better readability.
##########
be/src/olap/memtable.h:
##########
@@ -178,12 +186,18 @@ class MemTable {
//for vectorized
vectorized::MutableBlock _input_mutable_block;
vectorized::MutableBlock _output_mutable_block;
+ size_t _last_sorted_pos = 0;
+ //return number of same keys
+ int _sort();
+ template <bool is_final>
+ void finalize_one_row(RowInBlock* row, const
vectorized::ColumnsWithTypeAndName& block_data,
+ int row_pos);
template <bool is_final>
- void _collect_vskiplist_results();
- void _collect_dup_table_with_keys();
- void _collect_dup_table_without_keys();
+ void _aggregate();
+ void prepare_block_for_flush(vectorized::Block& in_block);
Review Comment:
```suggestion
void _prepare_block_for_flush(vectorized::Block& in_block);
```
##########
be/src/olap/memtable.h:
##########
@@ -178,12 +186,18 @@ class MemTable {
//for vectorized
vectorized::MutableBlock _input_mutable_block;
vectorized::MutableBlock _output_mutable_block;
+ size_t _last_sorted_pos = 0;
+ //return number of same keys
+ int _sort();
+ template <bool is_final>
+ void finalize_one_row(RowInBlock* row, const
vectorized::ColumnsWithTypeAndName& block_data,
Review Comment:
Private method name should starts with an underscore
```suggestion
void _finalize_one_row(RowInBlock* row, const
vectorized::ColumnsWithTypeAndName& block_data,
```
##########
be/src/olap/memtable.h:
##########
@@ -178,12 +186,18 @@ class MemTable {
//for vectorized
vectorized::MutableBlock _input_mutable_block;
vectorized::MutableBlock _output_mutable_block;
+ size_t _last_sorted_pos = 0;
+ //return number of same keys
+ int _sort();
+ template <bool is_final>
+ void finalize_one_row(RowInBlock* row, const
vectorized::ColumnsWithTypeAndName& block_data,
+ int row_pos);
template <bool is_final>
- void _collect_vskiplist_results();
- void _collect_dup_table_with_keys();
- void _collect_dup_table_without_keys();
+ void _aggregate();
+ void prepare_block_for_flush(vectorized::Block& in_block);
bool _is_first_insertion;
+ std::function<void(int64_t)> delta_writer_callback;
Review Comment:
```suggestion
std::function<void(int64_t)> _delta_writer_callback;
```
##########
be/src/olap/memtable.h:
##########
@@ -113,8 +123,8 @@ class MemTable {
Review Comment:
This can be removed.
```cpp
private:
using VecTable = SkipList<RowInBlock*, RowInBlockComparator>;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]