This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 60a2fa7dea [Improvement](compaction) copy row in batch in 
VCollectIterator&VGenericIterator (#12214)
60a2fa7dea is described below

commit 60a2fa7deac959ff34c3aefec1f560b4cfc18775
Author: yixiutt <[email protected]>
AuthorDate: Thu Sep 1 10:20:17 2022 +0800

    [Improvement](compaction) copy row in batch in 
VCollectIterator&VGenericIterator (#12214)
    
    In VCollectIterator&VGenericIterator, use insert_range_from to copy rows
    in a block which is continuous to save cpu cost.
    
    If rows in rowset and segment are non overlapping, this whill improve 30%
    throughput of compaction.If rows are completely overlapping such as load two
    same files, the throughput goes nearly same as before.
    
    Co-authored-by: yixiutt <[email protected]>
---
 be/src/olap/compaction.cpp             |  2 ++
 be/src/vec/olap/vcollect_iterator.cpp  | 48 ++++++++++++++++++++++++-----
 be/src/vec/olap/vgeneric_iterators.cpp | 55 +++++++++++++++++++++++++++++-----
 3 files changed, 90 insertions(+), 15 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a71b441bfd..ec83adbed2 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -231,6 +231,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
               << ". tablet=" << _tablet->full_name() << ", output_version=" << 
_output_version
               << ", current_max_version=" << current_max_version
               << ", disk=" << _tablet->data_dir()->path() << ", segments=" << 
segments_num
+              << ", input_row_num=" << _input_row_num
+              << ", output_row_num=" << _output_rowset->num_rows()
               << ". elapsed time=" << watch.get_elapse_second()
               << "s. cumulative_compaction_policy="
               << _tablet->cumulative_compaction_policy()->name() << ".";
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 27e39ced95..159e59be6b 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -444,20 +444,29 @@ Status 
VCollectIterator::Level1Iterator::_merge_next(Block* block) {
     auto target_columns = block->mutate_columns();
     size_t column_count = block->columns();
     IteratorRowRef cur_row = _ref;
+    IteratorRowRef pre_row_ref = _ref;
+
     if (UNLIKELY(_reader->_reader_context.record_rowids)) {
         _block_row_locations.resize(_batch_size);
     }
+    int continuous_row_in_block = 0;
     do {
-        const auto& src_block = cur_row.block;
-        assert(src_block->columns() == column_count);
-        for (size_t i = 0; i < column_count; ++i) {
-            
target_columns[i]->insert_from(*(src_block->get_by_position(i).column),
-                                           cur_row.row_pos);
-        }
         if (UNLIKELY(_reader->_reader_context.record_rowids)) {
             _block_row_locations[target_block_row] = 
_cur_child->current_row_location();
         }
         ++target_block_row;
+        ++continuous_row_in_block;
+        // cur block finished, copy before merge_next cause merge_next will
+        // clear block column data
+        if (pre_row_ref.row_pos + continuous_row_in_block == 
pre_row_ref.block->rows()) {
+            const auto& src_block = pre_row_ref.block;
+            for (size_t i = 0; i < column_count; ++i) {
+                
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+                                                     pre_row_ref.row_pos, 
continuous_row_in_block);
+            }
+            continuous_row_in_block = 0;
+            pre_row_ref.block = nullptr;
+        }
         auto res = _merge_next(&cur_row);
         if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
             if (UNLIKELY(_reader->_reader_context.record_rowids)) {
@@ -470,7 +479,32 @@ Status 
VCollectIterator::Level1Iterator::_merge_next(Block* block) {
             LOG(WARNING) << "next failed: " << res;
             return res;
         }
-    } while (target_block_row < _batch_size);
+        if (target_block_row >= _batch_size) {
+            if (continuous_row_in_block > 0) {
+                const auto& src_block = pre_row_ref.block;
+                for (size_t i = 0; i < column_count; ++i) {
+                    
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+                                                         pre_row_ref.row_pos,
+                                                         
continuous_row_in_block);
+                }
+            }
+            return Status::OK();
+        }
+        if (continuous_row_in_block == 0) {
+            pre_row_ref = _ref;
+            continue;
+        }
+        // copy row if meet a new block
+        if (cur_row.block != pre_row_ref.block) {
+            const auto& src_block = pre_row_ref.block;
+            for (size_t i = 0; i < column_count; ++i) {
+                
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+                                                     pre_row_ref.row_pos, 
continuous_row_in_block);
+            }
+            continuous_row_in_block = 0;
+            pre_row_ref = cur_row;
+        }
+    } while (true);
 
     return Status::OK();
 }
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 83fdcbb105..dd9fd28963 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -187,9 +187,13 @@ public:
         return result;
     }
 
-    void copy_row(vectorized::Block* block) {
+    // `advanced = false` when current block finished
+    void copy_rows(vectorized::Block* block, bool advanced = true) {
         vectorized::Block& src = _block;
         vectorized::Block& dst = *block;
+        if (_cur_batch_num == 0 || _index_in_block - _cur_batch_num < 0) {
+            return;
+        }
 
         for (size_t i = 0; i < _num_columns; ++i) {
             auto& s_col = src.get_by_position(i);
@@ -199,8 +203,14 @@ public:
             vectorized::ColumnPtr& d_cp = d_col.column;
 
             //copy a row to dst block column by column
-            ((vectorized::IColumn&)(*d_cp)).insert_from(*s_cp, 
_index_in_block);
+            size_t start = _index_in_block - _cur_batch_num + 1;
+            if (advanced) {
+                start--;
+            }
+            DCHECK(start >= 0);
+            ((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start, 
_cur_batch_num);
         }
+        _cur_batch_num = 0;
     }
 
     RowLocation current_row_location() {
@@ -224,6 +234,17 @@ public:
 
     void set_skip(bool skip) const { _skip = skip; }
 
+    void add_cur_batch() { _cur_batch_num++; }
+
+    void reset_cur_batch() { _cur_batch_num = 0; }
+
+    bool is_cur_block_finished() {
+        if (_index_in_block == _block.rows() - 1) {
+            return true;
+        }
+        return false;
+    }
+
 private:
     // Load next block into _block
     Status _load_next_block();
@@ -246,6 +267,7 @@ private:
     std::vector<uint32_t>* _compare_columns;
     std::vector<RowLocation> _block_row_locations;
     bool _record_rowids = false;
+    size_t _cur_batch_num = 0;
 };
 
 Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -344,7 +366,7 @@ private:
 
     VMergeHeap _merge_heap;
 
-    int block_row_max = 0;
+    int _block_row_max = 0;
     int _sequence_id_idx = -1;
     bool _is_unique = false;
     bool _is_reverse = false;
@@ -372,31 +394,48 @@ Status VMergeIterator::init(const StorageReadOptions& 
opts) {
 
     _origin_iters.clear();
 
-    block_row_max = opts.block_row_max;
+    _block_row_max = opts.block_row_max;
 
     return Status::OK();
 }
 
 Status VMergeIterator::next_batch(vectorized::Block* block) {
     if (UNLIKELY(_record_rowids)) {
-        _block_row_locations.resize(block_row_max);
+        _block_row_locations.resize(_block_row_max);
     }
     size_t row_idx = 0;
-    while (block->rows() < block_row_max) {
+    VMergeIteratorContext* pre_ctx = nullptr;
+    while (block->rows() < _block_row_max) {
         if (_merge_heap.empty()) break;
 
         auto ctx = _merge_heap.top();
         _merge_heap.pop();
 
         if (!ctx->need_skip()) {
-            // copy current row to block
-            ctx->copy_row(block);
+            ctx->add_cur_batch();
+            if (pre_ctx != ctx) {
+                if (pre_ctx) {
+                    pre_ctx->copy_rows(block);
+                }
+                pre_ctx = ctx;
+            }
             if (UNLIKELY(_record_rowids)) {
                 _block_row_locations[row_idx] = ctx->current_row_location();
             }
             row_idx++;
+            if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
+                // current block finished, ctx not advance
+                // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
+                ctx->copy_rows(block, false);
+                pre_ctx = nullptr;
+            }
         } else if (_merged_rows != nullptr) {
             (*_merged_rows)++;
+            // need skip cur row, so flush rows in pre_ctx
+            if (pre_ctx) {
+                pre_ctx->copy_rows(block);
+                pre_ctx = nullptr;
+            }
         }
 
         RETURN_IF_ERROR(ctx->advance());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to