This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d88b5a8f [improvement](vertical_compaction) reduce segments load in 
vertical merger (#14900)
75d88b5a8f is described below

commit 75d88b5a8ffb7d90025a96d6b7c8637d18ae87e0
Author: yixiutt <[email protected]>
AuthorDate: Mon Dec 12 15:06:46 2022 +0800

    [improvement](vertical_compaction) reduce segments load in vertical merger 
(#14900)
---
 be/src/olap/row.h                           |   4 +-
 be/src/vec/olap/vertical_block_reader.cpp   |  31 +++++++--
 be/src/vec/olap/vertical_block_reader.h     |   3 +-
 be/src/vec/olap/vertical_merge_iterator.cpp | 101 ++++++++++++++++++++--------
 be/src/vec/olap/vertical_merge_iterator.h   |  17 ++++-
 5 files changed, 118 insertions(+), 38 deletions(-)

diff --git a/be/src/olap/row.h b/be/src/olap/row.h
index cfb14462a0..7fa3767db1 100644
--- a/be/src/olap/row.h
+++ b/be/src/olap/row.h
@@ -209,7 +209,9 @@ uint32_t hash_row(const RowType& row, uint32_t seed) {
         FieldType type = row.schema()->column(cid)->type();
         // The approximation of float/double in a certain precision range, the 
binary of byte is not
         // a fixed value, so these two types are ignored in calculating hash 
code.
-        if (type == OLAP_FIELD_TYPE_FLOAT || type == OLAP_FIELD_TYPE_DOUBLE) {
+        // HLL type use flat map to store hash_set and it should be ignored
+        if (type == OLAP_FIELD_TYPE_FLOAT || type == OLAP_FIELD_TYPE_DOUBLE ||
+            type == OLAP_FIELD_TYPE_HLL) {
             continue;
         }
         seed = row.schema()->column(cid)->hash_code(row.cell(cid), seed);
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 02dafbefcc..53c93523bb 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -37,7 +37,8 @@ VerticalBlockReader::~VerticalBlockReader() {
 }
 
 Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& 
read_params,
-                                                   
std::vector<RowwiseIterator*>* segment_iters) {
+                                                   
std::vector<RowwiseIterator*>* segment_iters,
+                                                   std::vector<bool>* 
iterator_init_flag) {
     std::vector<RowsetReaderSharedPtr> rs_readers;
     auto res = _capture_rs_readers(read_params, &rs_readers);
     if (!res.ok()) {
@@ -54,6 +55,24 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
     for (auto& rs_reader : rs_readers) {
         // segment iterator will be inited here
         RETURN_NOT_OK(rs_reader->get_segment_iterators(&_reader_context, 
segment_iters));
+        // if segments overlapping, all segment iterator should be inited in
+        // heap merge iterator. If segments are none overlapping, only first 
segment of this
+        // rowset will be inited and push to heap, other segment will be 
inited later when current
+        // segment reached it's end.
+        // Use this iterator_init_flag so we can load few segments in 
HeapMergeIterator to save memory
+        if (rs_reader->rowset()->is_segments_overlapping()) {
+            for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+                iterator_init_flag->push_back(true);
+            }
+        } else {
+            for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+                if (i == 0) {
+                    iterator_init_flag->push_back(true);
+                    continue;
+                }
+                iterator_init_flag->push_back(false);
+            }
+        }
         rs_reader->reset_read_options();
     }
     return Status::OK();
@@ -62,7 +81,9 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
 Status VerticalBlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
     // get segment iterators
     std::vector<RowwiseIterator*> segment_iters;
-    RETURN_IF_ERROR(_get_segment_iterators(read_params, &segment_iters));
+    std::vector<bool> iterator_init_flag;
+    RETURN_IF_ERROR(_get_segment_iterators(read_params, &segment_iters, 
&iterator_init_flag));
+    CHECK(segment_iters.size() == iterator_init_flag.size());
 
     // build heap if key column iterator or build vertical merge iterator if 
value column
     auto ori_return_col_size = _return_columns.size();
@@ -71,9 +92,9 @@ Status VerticalBlockReader::_init_collect_iter(const 
ReaderParams& read_params)
         if (read_params.tablet->tablet_schema()->has_sequence_col()) {
             seq_col_idx = 
read_params.tablet->tablet_schema()->sequence_col_idx();
         }
-        _vcollect_iter = new_vertical_heap_merge_iterator(segment_iters, 
ori_return_col_size,
-                                                          
read_params.tablet->keys_type(),
-                                                          seq_col_idx, 
_row_sources_buffer);
+        _vcollect_iter = new_vertical_heap_merge_iterator(
+                segment_iters, iterator_init_flag, ori_return_col_size,
+                read_params.tablet->keys_type(), seq_col_idx, 
_row_sources_buffer);
     } else {
         _vcollect_iter = new_vertical_mask_merge_iterator(segment_iters, 
ori_return_col_size,
                                                           _row_sources_buffer);
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index 7c2e99eacf..b701a84b55 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -70,7 +70,8 @@ private:
     Status _init_collect_iter(const ReaderParams& read_params);
 
     Status _get_segment_iterators(const ReaderParams& read_params,
-                                  std::vector<RowwiseIterator*>* 
segment_iters);
+                                  std::vector<RowwiseIterator*>* segment_iters,
+                                  std::vector<bool>* iterator_init_flag);
 
     void _init_agg_state(const ReaderParams& read_params);
     void _append_agg_data(MutableColumns& columns);
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 5e683d3bce..0d553e187d 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -272,11 +272,15 @@ void VerticalMergeIteratorContext::copy_rows(Block* 
block, bool advanced) {
 }
 
 Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
+    if (LIKELY(_inited)) {
+        return Status::OK();
+    }
     _block_row_max = opts.block_row_max;
     RETURN_IF_ERROR(_load_next_block());
     if (valid()) {
         RETURN_IF_ERROR(advance());
     }
+    _inited = true;
     return Status::OK();
 }
 
@@ -340,7 +344,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
     std::vector<RowSource> tmp_row_sources;
     while (_get_size(block) < _block_row_max) {
         if (_merge_heap.empty()) {
-            LOG(INFO) << "_merge_heap empty";
+            VLOG_NOTICE << "_merge_heap empty";
             break;
         }
 
@@ -380,6 +384,22 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) 
{
         if (ctx->valid()) {
             _merge_heap.push(ctx);
         } else {
+            // push next iterator in same rowset into heap
+            auto cur_order = ctx->order();
+            while (cur_order + 1 < _iterator_init_flags.size() &&
+                   !_iterator_init_flags[cur_order + 1]) {
+                auto next_ctx = _ori_iter_ctx[cur_order + 1];
+                DCHECK(next_ctx);
+                RETURN_IF_ERROR(next_ctx->init(_opts));
+                if (!next_ctx->valid()) {
+                    // next_ctx is empty segment, move to next
+                    ++cur_order;
+                    delete next_ctx;
+                    continue;
+                }
+                _merge_heap.push(next_ctx);
+                break;
+            }
             // Release ctx earlier to reduce resource consumed
             delete ctx;
         }
@@ -395,41 +415,68 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     if (_origin_iters.empty()) {
         return Status::OK();
     }
+    DCHECK(_origin_iters.size() == _iterator_init_flags.size());
     _schema = &(*_origin_iters.begin())->schema();
 
     auto seg_order = 0;
+    // Init contxt depends on _iterator_init_flags
+    // for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator 
needs
+    // to be inited and [0-2] is in same rowset.
+    // Notice: if iterator[0] is empty it will be invalid when init succeed, 
but it
+    // will not be pushed into heap, we should init next one util we find a 
valid iter
+    // so this rowset can work in heap
+    bool pre_iter_invalid = false;
     for (auto iter : _origin_iters) {
-        auto ctx = std::make_unique<VerticalMergeIteratorContext>(iter, 
_ori_return_cols, seg_order,
-                                                                  
_seq_col_idx);
-        RETURN_IF_ERROR(ctx->init(opts));
-        if (!ctx->valid()) {
-            continue;
+        VerticalMergeIteratorContext* ctx =
+                new VerticalMergeIteratorContext(iter, _ori_return_cols, 
seg_order, _seq_col_idx);
+        _ori_iter_ctx.push_back(ctx);
+        if (_iterator_init_flags[seg_order] || pre_iter_invalid) {
+            RETURN_IF_ERROR(ctx->init(opts));
+            if (!ctx->valid()) {
+                pre_iter_invalid = true;
+                ++seg_order;
+                delete ctx;
+                continue;
+            }
+            _merge_heap.push(ctx);
+            pre_iter_invalid = false;
         }
-        _merge_heap.push(ctx.release());
         ++seg_order;
     }
     _origin_iters.clear();
 
+    _opts = opts;
     _block_row_max = opts.block_row_max;
     return Status::OK();
 }
 
 //  ----------------  VerticalMaskMergeIterator  -------------  //
+Status VerticalMaskMergeIterator::check_all_iter_finished() {
+    for (auto iter : _origin_iter_ctx) {
+        if (iter->inited()) {
+            RETURN_IF_ERROR(iter->advance());
+            DCHECK(!iter->valid());
+        }
+    }
+    return Status::OK();
+}
 Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
     DCHECK(_row_sources_buf);
     auto st = _row_sources_buf->has_remaining();
     if (!st.ok()) {
         if (st.is<END_OF_FILE>()) {
-            for (auto iter : _origin_iter_ctx) {
-                RETURN_IF_ERROR(iter->advance());
-                DCHECK(!iter->valid());
-            }
+            RETURN_IF_ERROR(check_all_iter_finished());
         }
         return st;
     }
+
     auto row_source = _row_sources_buf->current();
     uint16_t order = row_source.get_source_num();
     auto& ctx = _origin_iter_ctx[order];
+    // init ctx and this ctx must be valid
+    RETURN_IF_ERROR(ctx->init(_opts));
+    DCHECK(ctx->valid());
+
     if (UNLIKELY(ctx->is_first_row())) {
         // first row in block, don't call ctx->advance
         // Except first row, we call advance first and than get cur row
@@ -455,6 +502,9 @@ Status 
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
         auto row_source = _row_sources_buf->current();
         uint16_t order = row_source.get_source_num();
         auto& ctx = _origin_iter_ctx[order];
+        RETURN_IF_ERROR(ctx->init(_opts));
+        DCHECK(ctx->valid());
+
         if (UNLIKELY(ctx->is_first_row()) && !row_source.agg_flag()) {
             // first row in block, don't call ctx->advance
             // Except first row, we call advance first and than get cur row
@@ -471,11 +521,9 @@ Status 
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
         }
         st = _row_sources_buf->has_remaining();
     }
+
     if (st.is<END_OF_FILE>()) {
-        for (auto iter : _origin_iter_ctx) {
-            RETURN_IF_ERROR(iter->advance());
-            DCHECK(!iter->valid());
-        }
+        RETURN_IF_ERROR(check_all_iter_finished());
     }
     return st;
 }
@@ -488,6 +536,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
         uint16_t order = _row_sources_buf->current().get_source_num();
         DCHECK(order < _origin_iter_ctx.size());
         auto& ctx = _origin_iter_ctx[order];
+        RETURN_IF_ERROR(ctx->init(_opts));
+        DCHECK(ctx->valid());
 
         // find max same source count in cur ctx
         size_t limit = std::min(ctx->remain_rows(), _block_row_max - rows);
@@ -500,10 +550,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) 
{
         st = _row_sources_buf->has_remaining();
     }
     if (st.is<END_OF_FILE>()) {
-        for (auto iter : _origin_iter_ctx) {
-            RETURN_IF_ERROR(iter->advance());
-            DCHECK(!iter->valid());
-        }
+        RETURN_IF_ERROR(check_all_iter_finished());
     }
     return st;
 }
@@ -513,16 +560,12 @@ Status VerticalMaskMergeIterator::init(const 
StorageReadOptions& opts) {
         return Status::OK();
     }
     _schema = &(*_origin_iters.begin())->schema();
+    _opts = opts;
 
     for (auto iter : _origin_iters) {
         auto ctx = std::make_unique<VerticalMergeIteratorContext>(iter, 
_ori_return_cols, -1, -1);
-        RETURN_IF_ERROR(ctx->init(opts));
-        if (!ctx->valid()) {
-            continue;
-        }
         _origin_iter_ctx.emplace_back(ctx.release());
     }
-
     _origin_iters.clear();
 
     _block_row_max = opts.block_row_max;
@@ -531,10 +574,12 @@ Status VerticalMaskMergeIterator::init(const 
StorageReadOptions& opts) {
 
 // interfaces to create vertical merge iterator
 std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
-        const std::vector<RowwiseIterator*>& inputs, size_t ori_return_cols, 
KeysType keys_type,
-        uint32_t seq_col_idx, RowSourcesBuffer* row_sources) {
-    return std::make_shared<VerticalHeapMergeIterator>(std::move(inputs), 
ori_return_cols,
-                                                       keys_type, seq_col_idx, 
row_sources);
+        std::vector<RowwiseIterator*> inputs, const std::vector<bool>& 
iterator_init_flag,
+        size_t ori_return_cols, KeysType keys_type, uint32_t seq_col_idx,
+        RowSourcesBuffer* row_sources) {
+    return std::make_shared<VerticalHeapMergeIterator>(std::move(inputs), 
iterator_init_flag,
+                                                       ori_return_cols, 
keys_type, seq_col_idx,
+                                                       row_sources);
 }
 
 std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator(
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index d8ce2b516c..61cf723afc 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -174,6 +174,7 @@ public:
         ref->block = _block;
         ref->row_pos = _index_in_block;
     }
+    bool inited() const { return _inited; }
 
 private:
     // Load next block into _block
@@ -188,6 +189,7 @@ private:
     uint32_t _seq_col_idx = -1;
 
     bool _valid = false;
+    bool _inited = false;
     mutable bool _is_same = false;
     size_t _index_in_block = -1;
     size_t _block_row_max = 0;
@@ -206,10 +208,12 @@ private:
 class VerticalHeapMergeIterator : public RowwiseIterator {
 public:
     // VerticalMergeIterator takes the ownership of input iterators
-    VerticalHeapMergeIterator(std::vector<RowwiseIterator*> iters, size_t 
ori_return_cols,
+    VerticalHeapMergeIterator(std::vector<RowwiseIterator*> iters,
+                              std::vector<bool> iterator_init_flags, size_t 
ori_return_cols,
                               KeysType keys_type, int32_t seq_col_idx,
                               RowSourcesBuffer* row_sources_buf)
             : _origin_iters(std::move(iters)),
+              _iterator_init_flags(iterator_init_flags),
               _ori_return_cols(ori_return_cols),
               _keys_type(keys_type),
               _seq_col_idx(seq_col_idx),
@@ -234,6 +238,7 @@ private:
 private:
     // It will be released after '_merge_heap' has been built.
     std::vector<RowwiseIterator*> _origin_iters;
+    std::vector<bool> _iterator_init_flags;
     size_t _ori_return_cols;
 
     const Schema* _schema = nullptr;
@@ -250,11 +255,13 @@ private:
                                            VerticalMergeContextComparator>;
 
     VMergeHeap _merge_heap;
+    std::vector<VerticalMergeIteratorContext*> _ori_iter_ctx;
     int _block_row_max = 0;
     KeysType _keys_type;
     int32_t _seq_col_idx = -1;
     RowSourcesBuffer* _row_sources_buf;
     uint32_t _merged_rows = 0;
+    StorageReadOptions _opts;
 };
 
 // --------------- VerticalMaskMergeIterator ------------- //
@@ -286,6 +293,8 @@ public:
 private:
     int _get_size(Block* block) { return block->rows(); }
 
+    Status check_all_iter_finished();
+
 private:
     // released after build ctx
     std::vector<RowwiseIterator*> _origin_iters;
@@ -297,12 +306,14 @@ private:
 
     int _block_row_max = 0;
     RowSourcesBuffer* _row_sources_buf;
+    StorageReadOptions _opts;
 };
 
 // segment merge iterator
 std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
-        const std::vector<RowwiseIterator*>& inputs, size_t _ori_return_cols, 
KeysType key_type,
-        uint32_t seq_col_idx, RowSourcesBuffer* row_sources_buf);
+        std::vector<RowwiseIterator*> inputs, const std::vector<bool>& 
iterator_init_flag,
+        size_t _ori_return_cols, KeysType key_type, uint32_t seq_col_idx,
+        RowSourcesBuffer* row_sources_buf);
 
 std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator(
         const std::vector<RowwiseIterator*>& inputs, size_t ori_return_cols,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to