This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75d88b5a8f [improvement](vertical_compaction) reduce segments load in
vertical merger (#14900)
75d88b5a8f is described below
commit 75d88b5a8ffb7d90025a96d6b7c8637d18ae87e0
Author: yixiutt <[email protected]>
AuthorDate: Mon Dec 12 15:06:46 2022 +0800
[improvement](vertical_compaction) reduce segments load in vertical merger
(#14900)
---
be/src/olap/row.h | 4 +-
be/src/vec/olap/vertical_block_reader.cpp | 31 +++++++--
be/src/vec/olap/vertical_block_reader.h | 3 +-
be/src/vec/olap/vertical_merge_iterator.cpp | 101 ++++++++++++++++++++--------
be/src/vec/olap/vertical_merge_iterator.h | 17 ++++-
5 files changed, 118 insertions(+), 38 deletions(-)
diff --git a/be/src/olap/row.h b/be/src/olap/row.h
index cfb14462a0..7fa3767db1 100644
--- a/be/src/olap/row.h
+++ b/be/src/olap/row.h
@@ -209,7 +209,9 @@ uint32_t hash_row(const RowType& row, uint32_t seed) {
FieldType type = row.schema()->column(cid)->type();
// The approximation of float/double in a certain precision range, the
binary of byte is not
// a fixed value, so these two types are ignored in calculating hash
code.
- if (type == OLAP_FIELD_TYPE_FLOAT || type == OLAP_FIELD_TYPE_DOUBLE) {
+ // HLL type use flat map to store hash_set and it should be ignored
+ if (type == OLAP_FIELD_TYPE_FLOAT || type == OLAP_FIELD_TYPE_DOUBLE ||
+ type == OLAP_FIELD_TYPE_HLL) {
continue;
}
seed = row.schema()->column(cid)->hash_code(row.cell(cid), seed);
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 02dafbefcc..53c93523bb 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -37,7 +37,8 @@ VerticalBlockReader::~VerticalBlockReader() {
}
Status VerticalBlockReader::_get_segment_iterators(const ReaderParams&
read_params,
-
std::vector<RowwiseIterator*>* segment_iters) {
+
std::vector<RowwiseIterator*>* segment_iters,
+ std::vector<bool>*
iterator_init_flag) {
std::vector<RowsetReaderSharedPtr> rs_readers;
auto res = _capture_rs_readers(read_params, &rs_readers);
if (!res.ok()) {
@@ -54,6 +55,24 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
for (auto& rs_reader : rs_readers) {
// segment iterator will be inited here
RETURN_NOT_OK(rs_reader->get_segment_iterators(&_reader_context,
segment_iters));
+ // if segments overlapping, all segment iterator should be inited in
+ // heap merge iterator. If segments are none overlapping, only first
segment of this
+ // rowset will be inited and push to heap, other segment will be
inited later when current
+ // segment reached it's end.
+ // Use this iterator_init_flag so we can load few segments in
HeapMergeIterator to save memory
+ if (rs_reader->rowset()->is_segments_overlapping()) {
+ for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+ iterator_init_flag->push_back(true);
+ }
+ } else {
+ for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+ if (i == 0) {
+ iterator_init_flag->push_back(true);
+ continue;
+ }
+ iterator_init_flag->push_back(false);
+ }
+ }
rs_reader->reset_read_options();
}
return Status::OK();
@@ -62,7 +81,9 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
Status VerticalBlockReader::_init_collect_iter(const ReaderParams&
read_params) {
// get segment iterators
std::vector<RowwiseIterator*> segment_iters;
- RETURN_IF_ERROR(_get_segment_iterators(read_params, &segment_iters));
+ std::vector<bool> iterator_init_flag;
+ RETURN_IF_ERROR(_get_segment_iterators(read_params, &segment_iters,
&iterator_init_flag));
+ CHECK(segment_iters.size() == iterator_init_flag.size());
// build heap if key column iterator or build vertical merge iterator if
value column
auto ori_return_col_size = _return_columns.size();
@@ -71,9 +92,9 @@ Status VerticalBlockReader::_init_collect_iter(const
ReaderParams& read_params)
if (read_params.tablet->tablet_schema()->has_sequence_col()) {
seq_col_idx =
read_params.tablet->tablet_schema()->sequence_col_idx();
}
- _vcollect_iter = new_vertical_heap_merge_iterator(segment_iters,
ori_return_col_size,
-
read_params.tablet->keys_type(),
- seq_col_idx,
_row_sources_buffer);
+ _vcollect_iter = new_vertical_heap_merge_iterator(
+ segment_iters, iterator_init_flag, ori_return_col_size,
+ read_params.tablet->keys_type(), seq_col_idx,
_row_sources_buffer);
} else {
_vcollect_iter = new_vertical_mask_merge_iterator(segment_iters,
ori_return_col_size,
_row_sources_buffer);
diff --git a/be/src/vec/olap/vertical_block_reader.h
b/be/src/vec/olap/vertical_block_reader.h
index 7c2e99eacf..b701a84b55 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -70,7 +70,8 @@ private:
Status _init_collect_iter(const ReaderParams& read_params);
Status _get_segment_iterators(const ReaderParams& read_params,
- std::vector<RowwiseIterator*>*
segment_iters);
+ std::vector<RowwiseIterator*>* segment_iters,
+ std::vector<bool>* iterator_init_flag);
void _init_agg_state(const ReaderParams& read_params);
void _append_agg_data(MutableColumns& columns);
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 5e683d3bce..0d553e187d 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -272,11 +272,15 @@ void VerticalMergeIteratorContext::copy_rows(Block*
block, bool advanced) {
}
Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
+ if (LIKELY(_inited)) {
+ return Status::OK();
+ }
_block_row_max = opts.block_row_max;
RETURN_IF_ERROR(_load_next_block());
if (valid()) {
RETURN_IF_ERROR(advance());
}
+ _inited = true;
return Status::OK();
}
@@ -340,7 +344,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
std::vector<RowSource> tmp_row_sources;
while (_get_size(block) < _block_row_max) {
if (_merge_heap.empty()) {
- LOG(INFO) << "_merge_heap empty";
+ VLOG_NOTICE << "_merge_heap empty";
break;
}
@@ -380,6 +384,22 @@ Status VerticalHeapMergeIterator::next_batch(Block* block)
{
if (ctx->valid()) {
_merge_heap.push(ctx);
} else {
+ // push next iterator in same rowset into heap
+ auto cur_order = ctx->order();
+ while (cur_order + 1 < _iterator_init_flags.size() &&
+ !_iterator_init_flags[cur_order + 1]) {
+ auto next_ctx = _ori_iter_ctx[cur_order + 1];
+ DCHECK(next_ctx);
+ RETURN_IF_ERROR(next_ctx->init(_opts));
+ if (!next_ctx->valid()) {
+ // next_ctx is empty segment, move to next
+ ++cur_order;
+ delete next_ctx;
+ continue;
+ }
+ _merge_heap.push(next_ctx);
+ break;
+ }
// Release ctx earlier to reduce resource consumed
delete ctx;
}
@@ -395,41 +415,68 @@ Status VerticalHeapMergeIterator::init(const
StorageReadOptions& opts) {
if (_origin_iters.empty()) {
return Status::OK();
}
+ DCHECK(_origin_iters.size() == _iterator_init_flags.size());
_schema = &(*_origin_iters.begin())->schema();
auto seg_order = 0;
+ // Init contxt depends on _iterator_init_flags
+ // for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator
needs
+ // to be inited and [0-2] is in same rowset.
+ // Notice: if iterator[0] is empty it will be invalid when init succeed,
but it
+ // will not be pushed into heap, we should init next one util we find a
valid iter
+ // so this rowset can work in heap
+ bool pre_iter_invalid = false;
for (auto iter : _origin_iters) {
- auto ctx = std::make_unique<VerticalMergeIteratorContext>(iter,
_ori_return_cols, seg_order,
-
_seq_col_idx);
- RETURN_IF_ERROR(ctx->init(opts));
- if (!ctx->valid()) {
- continue;
+ VerticalMergeIteratorContext* ctx =
+ new VerticalMergeIteratorContext(iter, _ori_return_cols,
seg_order, _seq_col_idx);
+ _ori_iter_ctx.push_back(ctx);
+ if (_iterator_init_flags[seg_order] || pre_iter_invalid) {
+ RETURN_IF_ERROR(ctx->init(opts));
+ if (!ctx->valid()) {
+ pre_iter_invalid = true;
+ ++seg_order;
+ delete ctx;
+ continue;
+ }
+ _merge_heap.push(ctx);
+ pre_iter_invalid = false;
}
- _merge_heap.push(ctx.release());
++seg_order;
}
_origin_iters.clear();
+ _opts = opts;
_block_row_max = opts.block_row_max;
return Status::OK();
}
// ---------------- VerticalMaskMergeIterator ------------- //
+Status VerticalMaskMergeIterator::check_all_iter_finished() {
+ for (auto iter : _origin_iter_ctx) {
+ if (iter->inited()) {
+ RETURN_IF_ERROR(iter->advance());
+ DCHECK(!iter->valid());
+ }
+ }
+ return Status::OK();
+}
Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
DCHECK(_row_sources_buf);
auto st = _row_sources_buf->has_remaining();
if (!st.ok()) {
if (st.is<END_OF_FILE>()) {
- for (auto iter : _origin_iter_ctx) {
- RETURN_IF_ERROR(iter->advance());
- DCHECK(!iter->valid());
- }
+ RETURN_IF_ERROR(check_all_iter_finished());
}
return st;
}
+
auto row_source = _row_sources_buf->current();
uint16_t order = row_source.get_source_num();
auto& ctx = _origin_iter_ctx[order];
+ // init ctx and this ctx must be valid
+ RETURN_IF_ERROR(ctx->init(_opts));
+ DCHECK(ctx->valid());
+
if (UNLIKELY(ctx->is_first_row())) {
// first row in block, don't call ctx->advance
// Except first row, we call advance first and than get cur row
@@ -455,6 +502,9 @@ Status
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
auto row_source = _row_sources_buf->current();
uint16_t order = row_source.get_source_num();
auto& ctx = _origin_iter_ctx[order];
+ RETURN_IF_ERROR(ctx->init(_opts));
+ DCHECK(ctx->valid());
+
if (UNLIKELY(ctx->is_first_row()) && !row_source.agg_flag()) {
// first row in block, don't call ctx->advance
// Except first row, we call advance first and than get cur row
@@ -471,11 +521,9 @@ Status
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
}
st = _row_sources_buf->has_remaining();
}
+
if (st.is<END_OF_FILE>()) {
- for (auto iter : _origin_iter_ctx) {
- RETURN_IF_ERROR(iter->advance());
- DCHECK(!iter->valid());
- }
+ RETURN_IF_ERROR(check_all_iter_finished());
}
return st;
}
@@ -488,6 +536,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
uint16_t order = _row_sources_buf->current().get_source_num();
DCHECK(order < _origin_iter_ctx.size());
auto& ctx = _origin_iter_ctx[order];
+ RETURN_IF_ERROR(ctx->init(_opts));
+ DCHECK(ctx->valid());
// find max same source count in cur ctx
size_t limit = std::min(ctx->remain_rows(), _block_row_max - rows);
@@ -500,10 +550,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block)
{
st = _row_sources_buf->has_remaining();
}
if (st.is<END_OF_FILE>()) {
- for (auto iter : _origin_iter_ctx) {
- RETURN_IF_ERROR(iter->advance());
- DCHECK(!iter->valid());
- }
+ RETURN_IF_ERROR(check_all_iter_finished());
}
return st;
}
@@ -513,16 +560,12 @@ Status VerticalMaskMergeIterator::init(const
StorageReadOptions& opts) {
return Status::OK();
}
_schema = &(*_origin_iters.begin())->schema();
+ _opts = opts;
for (auto iter : _origin_iters) {
auto ctx = std::make_unique<VerticalMergeIteratorContext>(iter,
_ori_return_cols, -1, -1);
- RETURN_IF_ERROR(ctx->init(opts));
- if (!ctx->valid()) {
- continue;
- }
_origin_iter_ctx.emplace_back(ctx.release());
}
-
_origin_iters.clear();
_block_row_max = opts.block_row_max;
@@ -531,10 +574,12 @@ Status VerticalMaskMergeIterator::init(const
StorageReadOptions& opts) {
// interfaces to create vertical merge iterator
std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
- const std::vector<RowwiseIterator*>& inputs, size_t ori_return_cols,
KeysType keys_type,
- uint32_t seq_col_idx, RowSourcesBuffer* row_sources) {
- return std::make_shared<VerticalHeapMergeIterator>(std::move(inputs),
ori_return_cols,
- keys_type, seq_col_idx,
row_sources);
+ std::vector<RowwiseIterator*> inputs, const std::vector<bool>&
iterator_init_flag,
+ size_t ori_return_cols, KeysType keys_type, uint32_t seq_col_idx,
+ RowSourcesBuffer* row_sources) {
+ return std::make_shared<VerticalHeapMergeIterator>(std::move(inputs),
iterator_init_flag,
+ ori_return_cols,
keys_type, seq_col_idx,
+ row_sources);
}
std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator(
diff --git a/be/src/vec/olap/vertical_merge_iterator.h
b/be/src/vec/olap/vertical_merge_iterator.h
index d8ce2b516c..61cf723afc 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -174,6 +174,7 @@ public:
ref->block = _block;
ref->row_pos = _index_in_block;
}
+ bool inited() const { return _inited; }
private:
// Load next block into _block
@@ -188,6 +189,7 @@ private:
uint32_t _seq_col_idx = -1;
bool _valid = false;
+ bool _inited = false;
mutable bool _is_same = false;
size_t _index_in_block = -1;
size_t _block_row_max = 0;
@@ -206,10 +208,12 @@ private:
class VerticalHeapMergeIterator : public RowwiseIterator {
public:
// VerticalMergeIterator takes the ownership of input iterators
- VerticalHeapMergeIterator(std::vector<RowwiseIterator*> iters, size_t
ori_return_cols,
+ VerticalHeapMergeIterator(std::vector<RowwiseIterator*> iters,
+ std::vector<bool> iterator_init_flags, size_t
ori_return_cols,
KeysType keys_type, int32_t seq_col_idx,
RowSourcesBuffer* row_sources_buf)
: _origin_iters(std::move(iters)),
+ _iterator_init_flags(iterator_init_flags),
_ori_return_cols(ori_return_cols),
_keys_type(keys_type),
_seq_col_idx(seq_col_idx),
@@ -234,6 +238,7 @@ private:
private:
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
+ std::vector<bool> _iterator_init_flags;
size_t _ori_return_cols;
const Schema* _schema = nullptr;
@@ -250,11 +255,13 @@ private:
VerticalMergeContextComparator>;
VMergeHeap _merge_heap;
+ std::vector<VerticalMergeIteratorContext*> _ori_iter_ctx;
int _block_row_max = 0;
KeysType _keys_type;
int32_t _seq_col_idx = -1;
RowSourcesBuffer* _row_sources_buf;
uint32_t _merged_rows = 0;
+ StorageReadOptions _opts;
};
// --------------- VerticalMaskMergeIterator ------------- //
@@ -286,6 +293,8 @@ public:
private:
int _get_size(Block* block) { return block->rows(); }
+ Status check_all_iter_finished();
+
private:
// released after build ctx
std::vector<RowwiseIterator*> _origin_iters;
@@ -297,12 +306,14 @@ private:
int _block_row_max = 0;
RowSourcesBuffer* _row_sources_buf;
+ StorageReadOptions _opts;
};
// segment merge iterator
std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
- const std::vector<RowwiseIterator*>& inputs, size_t _ori_return_cols,
KeysType key_type,
- uint32_t seq_col_idx, RowSourcesBuffer* row_sources_buf);
+ std::vector<RowwiseIterator*> inputs, const std::vector<bool>&
iterator_init_flag,
+ size_t _ori_return_cols, KeysType key_type, uint32_t seq_col_idx,
+ RowSourcesBuffer* row_sources_buf);
std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator(
const std::vector<RowwiseIterator*>& inputs, size_t ori_return_cols,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]