This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new db20e1f [refactor](storage) VGenericIterator to reuse Schema (#7858)
db20e1f is described below
commit db20e1f323744cd2fab91f120df644a4d47106be
Author: zuochunwei <[email protected]>
AuthorDate: Wed Feb 9 13:06:03 2022 +0800
[refactor](storage) VGenericIterator to reuse Schema (#7858)
1. reuse Schema to avoid copying, because clone Schema will generate a lot
of sub Field object
2. call interface provided by Block to reduce code lines
---
be/src/olap/row_block2.h | 2 +-
be/src/olap/rowset/beta_rowset_reader.cpp | 6 +--
be/src/olap/rowset/beta_rowset_reader.h | 1 +
.../rowset/segment_v2/empty_segment_iterator.h | 2 +-
be/src/olap/rowset/segment_v2/segment_iterator.h | 3 +-
be/src/vec/olap/vgeneric_iterators.cpp | 61 ++++++++--------------
6 files changed, 29 insertions(+), 46 deletions(-)
diff --git a/be/src/olap/row_block2.h b/be/src/olap/row_block2.h
index b98ab95..7f2b79d 100644
--- a/be/src/olap/row_block2.h
+++ b/be/src/olap/row_block2.h
@@ -111,7 +111,7 @@ public:
private:
Status _copy_data_to_column(int cid, vectorized::MutableColumnPtr&
mutable_column_ptr);
- Schema _schema;
+ const Schema& _schema;
size_t _capacity;
// _column_vector_batches[cid] == null if cid is not in `_schema`.
// memory are not allocated from `_pool` because we don't wan't to
reallocate them in clear()
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 263a4cc..3aed8eb 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -55,7 +55,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
_stats = _context->stats;
}
// SegmentIterator will load seek columns on demand
- Schema schema(_context->tablet_schema->columns(),
*(_context->return_columns));
+ _schema = std::make_unique<Schema>(_context->tablet_schema->columns(),
*(_context->return_columns));
// convert RowsetReaderContext to StorageReadOptions
StorageReadOptions read_options;
@@ -102,7 +102,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
std::unique_ptr<RowwiseIterator> iter;
- auto s = seg_ptr->new_iterator(schema, read_options, _parent_tracker,
&iter);
+ auto s = seg_ptr->new_iterator(*_schema, read_options,
_parent_tracker, &iter);
if (!s.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() <<
"]: " << s.to_string();
return OLAP_ERR_ROWSET_READER_INIT;
@@ -131,7 +131,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
_iterator.reset(final_iterator);
// init input block
- _input_block.reset(new RowBlockV2(schema,
+ _input_block.reset(new RowBlockV2(*_schema,
std::min(1024, read_context->batch_size), _parent_tracker));
if (!read_context->is_vec) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index add0c31..997ab12 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -58,6 +58,7 @@ public:
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
private:
+ std::unique_ptr<Schema> _schema;
RowsetReaderContext* _context;
BetaRowsetSharedPtr _rowset;
diff --git a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
index 3e1a4f9..0c186ed 100644
--- a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
@@ -35,7 +35,7 @@ public:
Status next_batch(vectorized::Block* block) override;
private:
- Schema _schema;
+ const Schema& _schema;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 0577526..2eae13e 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -103,8 +103,7 @@ private:
class BitmapRangeIterator;
std::shared_ptr<Segment> _segment;
- // TODO(zc): rethink if we need copy it
- Schema _schema;
+ const Schema& _schema;
// _column_iterators.size() == _schema.num_columns()
// _column_iterators[cid] == nullptr if cid is not in _schema
std::vector<ColumnIterator*> _column_iterators;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 3bee64b..f0f148d 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -100,7 +100,7 @@ public:
const Schema& schema() const override { return _schema; }
private:
- Schema _schema;
+ const Schema& _schema;
size_t _num_rows;
size_t _rows_returned;
};
@@ -136,12 +136,19 @@ public:
{
if (!_block) {
const Schema& schema = _iter->schema();
- for (auto &column_desc : schema.columns()) {
+ const auto& column_ids = schema.column_ids();
+ for (size_t i = 0; i < schema.num_column_ids(); ++i) {
+ auto column_desc = schema.column(column_ids[i]);
auto data_type =
Schema::get_data_type_ptr(column_desc->type());
if (data_type == nullptr) {
return Status::RuntimeError("invalid data type");
}
-
_block.insert(ColumnWithTypeAndName(data_type->create_column(), data_type,
column_desc->name()));
+ if (column_desc->is_nullable()) {
+ data_type =
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
+ }
+ auto column = data_type->create_column();
+ column->reserve(_block_row_max);
+ _block.insert(ColumnWithTypeAndName(std::move(column),
data_type, column_desc->name()));
}
} else {
_block.clear_column_data();
@@ -152,43 +159,17 @@ public:
// Initialize this context and will prepare data for current_row()
Status init(const StorageReadOptions& opts);
- int compare_row(const VMergeIteratorContext& rhs) const {
+ bool compare(const VMergeIteratorContext& rhs) const {
const Schema& schema = _iter->schema();
int num = schema.num_key_columns();
- for (uint32_t cid = 0; cid < num; ++cid) {
-#if 0
- auto name = schema.column(cid)->name();
- auto l_col = this->_block.get_by_name(name);
- auto r_col = rhs._block.get_by_name(name);
-
-#else
- //because the columns of block will be inserted by cid asc order
- //so no need to get column by get_by_name()
- auto l_col = this->_block.get_by_position(cid);
- auto r_col = rhs._block.get_by_position(cid);
-#endif
-
- auto l_cp = l_col.column;
- auto r_cp = r_col.column;
-
- auto res = l_cp->compare_at(_index_in_block, rhs._index_in_block,
*r_cp, -1);
- if (res) {
- return res;
- }
- }
-
- return 0;
- }
-
- bool compare(const VMergeIteratorContext& rhs) const {
- int cmp_res = this->compare_row(rhs);
+ int cmp_res = this->_block.compare_at(_index_in_block,
rhs._index_in_block, num, rhs._block, -1);
if (cmp_res != 0) {
return cmp_res > 0;
}
return this->data_id() < rhs.data_id();
}
- void copy_row_to(vectorized::Block* block) {
+ void copy_row(vectorized::Block* block) {
vectorized::Block& src = _block;
vectorized::Block& dst = *block;
@@ -230,9 +211,11 @@ private:
bool _valid = false;
size_t _index_in_block = -1;
+ int _block_row_max = 4096;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
+ _block_row_max = opts.block_row_max;
RETURN_IF_ERROR(_iter->init(opts));
RETURN_IF_ERROR(block_reset());
RETURN_IF_ERROR(_load_next_block());
@@ -246,7 +229,7 @@ Status VMergeIteratorContext::advance() {
// NOTE: we increase _index_in_block directly to valid one check
do {
_index_in_block++;
- if (_index_in_block < _block.rows()) {
+ if (LIKELY(_index_in_block < _block.rows())) {
return Status::OK();
}
// current batch has no data, load next batch
@@ -299,7 +282,7 @@ private:
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
- std::unique_ptr<Schema> _schema;
+ const Schema* _schema = nullptr;
struct VMergeContextComparator {
bool operator()(const VMergeIteratorContext* lhs, const
VMergeIteratorContext* rhs) const {
@@ -320,10 +303,10 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
if (_origin_iters.empty()) {
return Status::OK();
}
- _schema.reset(new Schema((*(_origin_iters.begin()))->schema()));
+ _schema = &(*_origin_iters.begin())->schema();
for (auto iter : _origin_iters) {
- std::unique_ptr<VMergeIteratorContext> ctx(new
VMergeIteratorContext(iter));
+ auto ctx = std::make_unique<VMergeIteratorContext>(iter);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@@ -347,7 +330,7 @@ Status VMergeIterator::next_batch(vectorized::Block* block)
{
_merge_heap.pop();
// copy current row to block
- ctx->copy_row_to(block);
+ ctx->copy_row(block);
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
@@ -383,7 +366,7 @@ public:
const Schema& schema() const override { return *_schema; }
private:
- std::unique_ptr<Schema> _schema;
+ const Schema* _schema = nullptr;
RowwiseIterator* _cur_iter = nullptr;
std::deque<RowwiseIterator*> _origin_iters;
};
@@ -396,8 +379,8 @@ Status VUnionIterator::init(const StorageReadOptions& opts)
{
for (auto iter : _origin_iters) {
RETURN_IF_ERROR(iter->init(opts));
}
- _schema.reset(new Schema((*(_origin_iters.begin()))->schema()));
_cur_iter = *(_origin_iters.begin());
+ _schema = &_cur_iter->schema();
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]