This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 60a2fa7dea [Improvement](compaction) copy row in batch in
VCollectIterator&VGenericIterator (#12214)
60a2fa7dea is described below
commit 60a2fa7deac959ff34c3aefec1f560b4cfc18775
Author: yixiutt <[email protected]>
AuthorDate: Thu Sep 1 10:20:17 2022 +0800
[Improvement](compaction) copy row in batch in
VCollectIterator&VGenericIterator (#12214)
In VCollectIterator&VGenericIterator, use insert_range_from to copy rows
in a block which is continuous to save cpu cost.
If rows in rowset and segment are non overlapping, this whill improve 30%
throughput of compaction.If rows are completely overlapping such as load two
same files, the throughput goes nearly same as before.
Co-authored-by: yixiutt <[email protected]>
---
be/src/olap/compaction.cpp | 2 ++
be/src/vec/olap/vcollect_iterator.cpp | 48 ++++++++++++++++++++++++-----
be/src/vec/olap/vgeneric_iterators.cpp | 55 +++++++++++++++++++++++++++++-----
3 files changed, 90 insertions(+), 15 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a71b441bfd..ec83adbed2 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -231,6 +231,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
<< ". tablet=" << _tablet->full_name() << ", output_version=" <<
_output_version
<< ", current_max_version=" << current_max_version
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" <<
segments_num
+ << ", input_row_num=" << _input_row_num
+ << ", output_row_num=" << _output_rowset->num_rows()
<< ". elapsed time=" << watch.get_elapse_second()
<< "s. cumulative_compaction_policy="
<< _tablet->cumulative_compaction_policy()->name() << ".";
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 27e39ced95..159e59be6b 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -444,20 +444,29 @@ Status
VCollectIterator::Level1Iterator::_merge_next(Block* block) {
auto target_columns = block->mutate_columns();
size_t column_count = block->columns();
IteratorRowRef cur_row = _ref;
+ IteratorRowRef pre_row_ref = _ref;
+
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
_block_row_locations.resize(_batch_size);
}
+ int continuous_row_in_block = 0;
do {
- const auto& src_block = cur_row.block;
- assert(src_block->columns() == column_count);
- for (size_t i = 0; i < column_count; ++i) {
-
target_columns[i]->insert_from(*(src_block->get_by_position(i).column),
- cur_row.row_pos);
- }
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
_block_row_locations[target_block_row] =
_cur_child->current_row_location();
}
++target_block_row;
+ ++continuous_row_in_block;
+ // cur block finished, copy before merge_next cause merge_next will
+ // clear block column data
+ if (pre_row_ref.row_pos + continuous_row_in_block ==
pre_row_ref.block->rows()) {
+ const auto& src_block = pre_row_ref.block;
+ for (size_t i = 0; i < column_count; ++i) {
+
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+ pre_row_ref.row_pos,
continuous_row_in_block);
+ }
+ continuous_row_in_block = 0;
+ pre_row_ref.block = nullptr;
+ }
auto res = _merge_next(&cur_row);
if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
@@ -470,7 +479,32 @@ Status
VCollectIterator::Level1Iterator::_merge_next(Block* block) {
LOG(WARNING) << "next failed: " << res;
return res;
}
- } while (target_block_row < _batch_size);
+ if (target_block_row >= _batch_size) {
+ if (continuous_row_in_block > 0) {
+ const auto& src_block = pre_row_ref.block;
+ for (size_t i = 0; i < column_count; ++i) {
+
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+ pre_row_ref.row_pos,
+
continuous_row_in_block);
+ }
+ }
+ return Status::OK();
+ }
+ if (continuous_row_in_block == 0) {
+ pre_row_ref = _ref;
+ continue;
+ }
+ // copy row if meet a new block
+ if (cur_row.block != pre_row_ref.block) {
+ const auto& src_block = pre_row_ref.block;
+ for (size_t i = 0; i < column_count; ++i) {
+
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+ pre_row_ref.row_pos,
continuous_row_in_block);
+ }
+ continuous_row_in_block = 0;
+ pre_row_ref = cur_row;
+ }
+ } while (true);
return Status::OK();
}
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 83fdcbb105..dd9fd28963 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -187,9 +187,13 @@ public:
return result;
}
- void copy_row(vectorized::Block* block) {
+ // `advanced = false` when current block finished
+ void copy_rows(vectorized::Block* block, bool advanced = true) {
vectorized::Block& src = _block;
vectorized::Block& dst = *block;
+ if (_cur_batch_num == 0 || _index_in_block - _cur_batch_num < 0) {
+ return;
+ }
for (size_t i = 0; i < _num_columns; ++i) {
auto& s_col = src.get_by_position(i);
@@ -199,8 +203,14 @@ public:
vectorized::ColumnPtr& d_cp = d_col.column;
//copy a row to dst block column by column
- ((vectorized::IColumn&)(*d_cp)).insert_from(*s_cp,
_index_in_block);
+ size_t start = _index_in_block - _cur_batch_num + 1;
+ if (advanced) {
+ start--;
+ }
+ DCHECK(start >= 0);
+ ((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start,
_cur_batch_num);
}
+ _cur_batch_num = 0;
}
RowLocation current_row_location() {
@@ -224,6 +234,17 @@ public:
void set_skip(bool skip) const { _skip = skip; }
+ void add_cur_batch() { _cur_batch_num++; }
+
+ void reset_cur_batch() { _cur_batch_num = 0; }
+
+ bool is_cur_block_finished() {
+ if (_index_in_block == _block.rows() - 1) {
+ return true;
+ }
+ return false;
+ }
+
private:
// Load next block into _block
Status _load_next_block();
@@ -246,6 +267,7 @@ private:
std::vector<uint32_t>* _compare_columns;
std::vector<RowLocation> _block_row_locations;
bool _record_rowids = false;
+ size_t _cur_batch_num = 0;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -344,7 +366,7 @@ private:
VMergeHeap _merge_heap;
- int block_row_max = 0;
+ int _block_row_max = 0;
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
@@ -372,31 +394,48 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
_origin_iters.clear();
- block_row_max = opts.block_row_max;
+ _block_row_max = opts.block_row_max;
return Status::OK();
}
Status VMergeIterator::next_batch(vectorized::Block* block) {
if (UNLIKELY(_record_rowids)) {
- _block_row_locations.resize(block_row_max);
+ _block_row_locations.resize(_block_row_max);
}
size_t row_idx = 0;
- while (block->rows() < block_row_max) {
+ VMergeIteratorContext* pre_ctx = nullptr;
+ while (block->rows() < _block_row_max) {
if (_merge_heap.empty()) break;
auto ctx = _merge_heap.top();
_merge_heap.pop();
if (!ctx->need_skip()) {
- // copy current row to block
- ctx->copy_row(block);
+ ctx->add_cur_batch();
+ if (pre_ctx != ctx) {
+ if (pre_ctx) {
+ pre_ctx->copy_rows(block);
+ }
+ pre_ctx = ctx;
+ }
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] = ctx->current_row_location();
}
row_idx++;
+ if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
+ // current block finished, ctx not advance
+ // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
+ ctx->copy_rows(block, false);
+ pre_ctx = nullptr;
+ }
} else if (_merged_rows != nullptr) {
(*_merged_rows)++;
+ // need skip cur row, so flush rows in pre_ctx
+ if (pre_ctx) {
+ pre_ctx->copy_rows(block);
+ pre_ctx = nullptr;
+ }
}
RETURN_IF_ERROR(ctx->advance());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]