This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 171ae2892f [improvement](batch size) pass batch size of exec engine to 
storage engine (#16614)
171ae2892f is described below

commit 171ae2892f0b9a3999200f0b093a80689a3cf143
Author: TengJianPing <[email protected]>
AuthorDate: Sat Feb 11 09:01:44 2023 +0800

    [improvement](batch size) pass batch size of exec engine to storage engine 
(#16614)
    
    Currently batch_size is not passed on to SegmentIterator, the 
SegmentIterator uses the hard coded value 4096 - 32 as the max row count of a 
block.
    
    
    * fix bug
---
 be/src/olap/reader.cpp                             |  1 -
 be/src/olap/reader.h                               |  5 +++--
 be/src/olap/rowset/beta_rowset_reader.cpp          |  1 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  2 +-
 be/src/vec/olap/block_reader.cpp                   | 12 ++++++------
 be/src/vec/olap/vcollect_iterator.cpp              |  6 +++---
 be/src/vec/olap/vcollect_iterator.h                |  3 ---
 be/src/vec/olap/vertical_block_reader.cpp          | 12 ++++++------
 8 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 2207836650..0d116dd0ae 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -220,7 +220,6 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params,
     _reader_context.stats = &_stats;
     _reader_context.use_page_cache = read_params.use_page_cache;
     _reader_context.sequence_id_idx = _sequence_col_idx;
-    _reader_context.batch_size = _batch_size;
     _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
     _reader_context.merged_rows = &_merged_rows;
     _reader_context.delete_bitmap = read_params.delete_bitmap;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index b9bafd6fed..3abccc8044 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -161,7 +161,9 @@ public:
                _stats.rows_vec_cond_filtered;
     }
 
-    void set_batch_size(int batch_size) { _batch_size = batch_size; }
+    void set_batch_size(int batch_size) { _reader_context.batch_size = 
batch_size; }
+
+    int batch_size() const { return _reader_context.batch_size; }
 
     const OlapReaderStatistics& stats() const { return _stats; }
     OlapReaderStatistics* mutable_stats() { return &_stats; }
@@ -237,7 +239,6 @@ protected:
     bool _filter_delete = false;
     int32_t _sequence_col_idx = -1;
     bool _direct_mode = false;
-    int _batch_size = 1024;
 
     std::vector<uint32_t> _key_cids;
     std::vector<uint32_t> _value_cids;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index c060b4c286..0f978e3c3a 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -62,6 +62,7 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     }
 
     // convert RowsetReaderContext to StorageReadOptions
+    _read_options.block_row_max = read_context->batch_size;
     _read_options.stats = _stats;
     _read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
     _read_options.remaining_vconjunct_root = 
_context->remaining_vconjunct_root;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index ac1c058f27..c199038813 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1346,7 +1346,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
     uint32_t nrows_read_limit = _opts.block_row_max;
     if (UNLIKELY(_estimate_row_size)) {
         // read 100 rows to estimate average row size
-        nrows_read_limit = 100;
+        nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
     }
     _split_row_ranges.clear();
     _split_row_ranges.reserve(nrows_read_limit / 2);
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index b872a80b1f..78e271625e 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -75,7 +75,6 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params,
     _vcollect_iter.init(this, _is_rowsets_overlapping, 
read_params.read_orderby_key,
                         read_params.read_orderby_key_reverse);
 
-    _reader_context.batch_size = _batch_size;
     _reader_context.is_vec = true;
     _reader_context.push_down_agg_type_opt = 
read_params.push_down_agg_type_opt;
     for (auto& rs_reader : rs_readers) {
@@ -108,7 +107,8 @@ void BlockReader::_init_agg_state(const ReaderParams& 
read_params) {
         return;
     }
 
-    _stored_data_columns = 
_next_row.block->create_same_struct_block(_batch_size)->mutate_columns();
+    _stored_data_columns =
+            
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
 
     _stored_has_null_tag.resize(_stored_data_columns.size());
     _stored_has_string_tag.resize(_stored_data_columns.size());
@@ -242,7 +242,7 @@ Status BlockReader::_agg_key_next_block(Block* block, bool* 
eof) {
         }
 
         if (!_get_next_row_same()) {
-            if (target_block_row == _batch_size) {
+            if (target_block_row == _reader_context.batch_size) {
                 break;
             }
             _agg_data_counters.push_back(_last_agg_data_counter);
@@ -274,7 +274,7 @@ Status BlockReader::_unique_key_next_block(Block* block, 
bool* eof) {
     auto target_block_row = 0;
     auto target_columns = block->mutate_columns();
     if (UNLIKELY(_reader_context.record_rowids)) {
-        _block_row_locations.resize(_batch_size);
+        _block_row_locations.resize(_reader_context.batch_size);
     }
 
     do {
@@ -301,7 +301,7 @@ Status BlockReader::_unique_key_next_block(Block* block, 
bool* eof) {
             LOG(WARNING) << "next failed: " << res;
             return res;
         }
-    } while (target_block_row < _batch_size);
+    } while (target_block_row < _reader_context.batch_size);
 
     // do filter delete row in base compaction, only base compaction need to 
do the job
     if (_filter_delete) {
@@ -351,7 +351,7 @@ void BlockReader::_append_agg_data(MutableColumns& columns) 
{
 
     // execute aggregate when have `batch_size` column or some ref invalid soon
     bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
-    if (is_last || _stored_row_ref.size() == _batch_size) {
+    if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
         _update_agg_data(columns);
     }
 }
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 3a666dd61b..b7313210e4 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -533,7 +533,6 @@ VCollectIterator::Level1Iterator::Level1Iterator(
           _is_reverse(is_reverse),
           _skip_same(skip_same) {
     _ref.reset();
-    _batch_size = reader->_batch_size;
     // !_merge means that data are in order, so we just reverse children to 
return data in reverse
     if (!_merge && _is_reverse) {
         _children.reverse();
@@ -703,8 +702,9 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* 
block) {
         block->insert(cur_row.block->get_by_position(i).clone_empty());
     }
 
+    auto batch_size = _reader->batch_size();
     if (UNLIKELY(_reader->_reader_context.record_rowids)) {
-        _block_row_locations.resize(_batch_size);
+        _block_row_locations.resize(batch_size);
     }
     int continuous_row_in_block = 0;
     do {
@@ -736,7 +736,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* 
block) {
             LOG(WARNING) << "next failed: " << res;
             return res;
         }
-        if (target_block_row >= _batch_size) {
+        if (target_block_row >= batch_size) {
             if (continuous_row_in_block > 0) {
                 const auto& src_block = pre_row_ref.block;
                 for (size_t i = 0; i < column_count; ++i) {
diff --git a/be/src/vec/olap/vcollect_iterator.h 
b/be/src/vec/olap/vcollect_iterator.h
index 218c92e99e..3018768d4a 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -298,9 +298,6 @@ private:
         // used when `_merge == true`
         std::unique_ptr<MergeHeap> _heap;
 
-        // batch size, get from TabletReader
-        int _batch_size;
-
         std::vector<RowLocation> _block_row_locations;
     };
 
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 0bc1b98da2..13eb7831ee 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -49,7 +49,6 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
                      << ", version:" << read_params.version;
         return res;
     }
-    _reader_context.batch_size = _batch_size;
     _reader_context.is_vec = true;
     _reader_context.is_vertical_compaction = true;
     for (auto& rs_reader : rs_readers) {
@@ -125,7 +124,8 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
         return;
     }
     DCHECK(_return_columns.size() == _next_row.block->columns());
-    _stored_data_columns = 
_next_row.block->create_same_struct_block(_batch_size)->mutate_columns();
+    _stored_data_columns =
+            
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
 
     _stored_has_null_tag.resize(_stored_data_columns.size());
     _stored_has_string_tag.resize(_stored_data_columns.size());
@@ -155,7 +155,7 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
 
 Status VerticalBlockReader::init(const ReaderParams& read_params) {
     StorageReadOptions opts;
-    _batch_size = opts.block_row_max;
+    _reader_context.batch_size = opts.block_row_max;
     RETURN_NOT_OK(TabletReader::init(read_params));
 
     auto status = _init_collect_iter(read_params);
@@ -209,7 +209,7 @@ void VerticalBlockReader::_append_agg_data(MutableColumns& 
columns) {
 
     // execute aggregate when have `batch_size` column or some ref invalid soon
     bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
-    if (is_last || _stored_row_ref.size() == _batch_size) {
+    if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
         _update_agg_data(columns);
     }
 }
@@ -332,7 +332,7 @@ Status VerticalBlockReader::_agg_key_next_block(Block* 
block, bool* eof) {
         }
         DCHECK(_next_row.block->columns() == block->columns());
         if (!_next_row.is_same) {
-            if (target_block_row == _batch_size) {
+            if (target_block_row == _reader_context.batch_size) {
                 break;
             }
             _agg_data_counters.push_back(_last_agg_data_counter);
@@ -432,7 +432,7 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
                                            _next_row.row_pos);
         }
         ++target_block_row;
-    } while (target_block_row < _batch_size);
+    } while (target_block_row < _reader_context.batch_size);
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to