This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 171ae2892f [improvement](batch size) pass batch size of exec engine to
storage engine (#16614)
171ae2892f is described below
commit 171ae2892f0b9a3999200f0b093a80689a3cf143
Author: TengJianPing <[email protected]>
AuthorDate: Sat Feb 11 09:01:44 2023 +0800
[improvement](batch size) pass batch size of exec engine to storage engine
(#16614)
Currently batch_size is not passed on to SegmentIterator, the
SegmentIterator uses the hard coded value 4096 - 32 as the max row count of a
block.
* fix bug
---
be/src/olap/reader.cpp | 1 -
be/src/olap/reader.h | 5 +++--
be/src/olap/rowset/beta_rowset_reader.cpp | 1 +
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 2 +-
be/src/vec/olap/block_reader.cpp | 12 ++++++------
be/src/vec/olap/vcollect_iterator.cpp | 6 +++---
be/src/vec/olap/vcollect_iterator.h | 3 ---
be/src/vec/olap/vertical_block_reader.cpp | 12 ++++++------
8 files changed, 20 insertions(+), 22 deletions(-)
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 2207836650..0d116dd0ae 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -220,7 +220,6 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params,
_reader_context.stats = &_stats;
_reader_context.use_page_cache = read_params.use_page_cache;
_reader_context.sequence_id_idx = _sequence_col_idx;
- _reader_context.batch_size = _batch_size;
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
_reader_context.merged_rows = &_merged_rows;
_reader_context.delete_bitmap = read_params.delete_bitmap;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index b9bafd6fed..3abccc8044 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -161,7 +161,9 @@ public:
_stats.rows_vec_cond_filtered;
}
- void set_batch_size(int batch_size) { _batch_size = batch_size; }
+ void set_batch_size(int batch_size) { _reader_context.batch_size =
batch_size; }
+
+ int batch_size() const { return _reader_context.batch_size; }
const OlapReaderStatistics& stats() const { return _stats; }
OlapReaderStatistics* mutable_stats() { return &_stats; }
@@ -237,7 +239,6 @@ protected:
bool _filter_delete = false;
int32_t _sequence_col_idx = -1;
bool _direct_mode = false;
- int _batch_size = 1024;
std::vector<uint32_t> _key_cids;
std::vector<uint32_t> _value_cids;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index c060b4c286..0f978e3c3a 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -62,6 +62,7 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
// convert RowsetReaderContext to StorageReadOptions
+ _read_options.block_row_max = read_context->batch_size;
_read_options.stats = _stats;
_read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
_read_options.remaining_vconjunct_root =
_context->remaining_vconjunct_root;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index ac1c058f27..c199038813 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1346,7 +1346,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
uint32_t nrows_read_limit = _opts.block_row_max;
if (UNLIKELY(_estimate_row_size)) {
// read 100 rows to estimate average row size
- nrows_read_limit = 100;
+ nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
}
_split_row_ranges.clear();
_split_row_ranges.reserve(nrows_read_limit / 2);
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index b872a80b1f..78e271625e 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -75,7 +75,6 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params,
_vcollect_iter.init(this, _is_rowsets_overlapping,
read_params.read_orderby_key,
read_params.read_orderby_key_reverse);
- _reader_context.batch_size = _batch_size;
_reader_context.is_vec = true;
_reader_context.push_down_agg_type_opt =
read_params.push_down_agg_type_opt;
for (auto& rs_reader : rs_readers) {
@@ -108,7 +107,8 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
return;
}
- _stored_data_columns =
_next_row.block->create_same_struct_block(_batch_size)->mutate_columns();
+ _stored_data_columns =
+
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
_stored_has_null_tag.resize(_stored_data_columns.size());
_stored_has_string_tag.resize(_stored_data_columns.size());
@@ -242,7 +242,7 @@ Status BlockReader::_agg_key_next_block(Block* block, bool*
eof) {
}
if (!_get_next_row_same()) {
- if (target_block_row == _batch_size) {
+ if (target_block_row == _reader_context.batch_size) {
break;
}
_agg_data_counters.push_back(_last_agg_data_counter);
@@ -274,7 +274,7 @@ Status BlockReader::_unique_key_next_block(Block* block,
bool* eof) {
auto target_block_row = 0;
auto target_columns = block->mutate_columns();
if (UNLIKELY(_reader_context.record_rowids)) {
- _block_row_locations.resize(_batch_size);
+ _block_row_locations.resize(_reader_context.batch_size);
}
do {
@@ -301,7 +301,7 @@ Status BlockReader::_unique_key_next_block(Block* block,
bool* eof) {
LOG(WARNING) << "next failed: " << res;
return res;
}
- } while (target_block_row < _batch_size);
+ } while (target_block_row < _reader_context.batch_size);
// do filter delete row in base compaction, only base compaction need to
do the job
if (_filter_delete) {
@@ -351,7 +351,7 @@ void BlockReader::_append_agg_data(MutableColumns& columns)
{
// execute aggregate when have `batch_size` column or some ref invalid soon
bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
- if (is_last || _stored_row_ref.size() == _batch_size) {
+ if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
_update_agg_data(columns);
}
}
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 3a666dd61b..b7313210e4 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -533,7 +533,6 @@ VCollectIterator::Level1Iterator::Level1Iterator(
_is_reverse(is_reverse),
_skip_same(skip_same) {
_ref.reset();
- _batch_size = reader->_batch_size;
// !_merge means that data are in order, so we just reverse children to
return data in reverse
if (!_merge && _is_reverse) {
_children.reverse();
@@ -703,8 +702,9 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block*
block) {
block->insert(cur_row.block->get_by_position(i).clone_empty());
}
+ auto batch_size = _reader->batch_size();
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
- _block_row_locations.resize(_batch_size);
+ _block_row_locations.resize(batch_size);
}
int continuous_row_in_block = 0;
do {
@@ -736,7 +736,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block*
block) {
LOG(WARNING) << "next failed: " << res;
return res;
}
- if (target_block_row >= _batch_size) {
+ if (target_block_row >= batch_size) {
if (continuous_row_in_block > 0) {
const auto& src_block = pre_row_ref.block;
for (size_t i = 0; i < column_count; ++i) {
diff --git a/be/src/vec/olap/vcollect_iterator.h
b/be/src/vec/olap/vcollect_iterator.h
index 218c92e99e..3018768d4a 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -298,9 +298,6 @@ private:
// used when `_merge == true`
std::unique_ptr<MergeHeap> _heap;
- // batch size, get from TabletReader
- int _batch_size;
-
std::vector<RowLocation> _block_row_locations;
};
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 0bc1b98da2..13eb7831ee 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -49,7 +49,6 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
<< ", version:" << read_params.version;
return res;
}
- _reader_context.batch_size = _batch_size;
_reader_context.is_vec = true;
_reader_context.is_vertical_compaction = true;
for (auto& rs_reader : rs_readers) {
@@ -125,7 +124,8 @@ void VerticalBlockReader::_init_agg_state(const
ReaderParams& read_params) {
return;
}
DCHECK(_return_columns.size() == _next_row.block->columns());
- _stored_data_columns =
_next_row.block->create_same_struct_block(_batch_size)->mutate_columns();
+ _stored_data_columns =
+
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
_stored_has_null_tag.resize(_stored_data_columns.size());
_stored_has_string_tag.resize(_stored_data_columns.size());
@@ -155,7 +155,7 @@ void VerticalBlockReader::_init_agg_state(const
ReaderParams& read_params) {
Status VerticalBlockReader::init(const ReaderParams& read_params) {
StorageReadOptions opts;
- _batch_size = opts.block_row_max;
+ _reader_context.batch_size = opts.block_row_max;
RETURN_NOT_OK(TabletReader::init(read_params));
auto status = _init_collect_iter(read_params);
@@ -209,7 +209,7 @@ void VerticalBlockReader::_append_agg_data(MutableColumns&
columns) {
// execute aggregate when have `batch_size` column or some ref invalid soon
bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
- if (is_last || _stored_row_ref.size() == _batch_size) {
+ if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
_update_agg_data(columns);
}
}
@@ -332,7 +332,7 @@ Status VerticalBlockReader::_agg_key_next_block(Block*
block, bool* eof) {
}
DCHECK(_next_row.block->columns() == block->columns());
if (!_next_row.is_same) {
- if (target_block_row == _batch_size) {
+ if (target_block_row == _reader_context.batch_size) {
break;
}
_agg_data_counters.push_back(_last_agg_data_counter);
@@ -432,7 +432,7 @@ Status VerticalBlockReader::_unique_key_next_block(Block*
block, bool* eof) {
_next_row.row_pos);
}
++target_block_row;
- } while (target_block_row < _batch_size);
+ } while (target_block_row < _reader_context.batch_size);
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]