This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 737c73dcf0 [Improvement](topn) order by key topn query optimization 
(#15663)
737c73dcf0 is described below

commit 737c73dcf015d21ab0f54b4c4ddb1b59eaa4466d
Author: Kang <[email protected]>
AuthorDate: Mon Feb 6 15:36:05 2023 +0800

    [Improvement](topn) order by key topn query optimization (#15663)
---
 be/src/olap/iterators.h                            |   3 +
 be/src/olap/reader.cpp                             |   5 +-
 be/src/olap/reader.h                               |   5 +
 be/src/olap/rowset/beta_rowset_reader.cpp          |   8 +
 be/src/olap/rowset/beta_rowset_reader.h            |   2 +
 be/src/olap/rowset/rowset_reader_context.h         |   5 +
 .../rowset/segment_v2/empty_segment_iterator.h     |   1 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |   8 +-
 be/src/runtime/runtime_predicate.cpp               |   7 +
 be/src/vec/core/block.cpp                          |  39 ++++-
 be/src/vec/core/block.h                            |  52 +++++-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   8 +-
 be/src/vec/olap/block_reader.cpp                   |  12 +-
 be/src/vec/olap/vcollect_iterator.cpp              | 177 +++++++++++++++++++++
 be/src/vec/olap/vcollect_iterator.h                |  27 ++++
 .../org/apache/doris/planner/OlapScanNode.java     |  10 ++
 .../org/apache/doris/planner/OriginalPlanner.java  |  10 +-
 17 files changed, 365 insertions(+), 14 deletions(-)

diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 36f765ac88..61ce931e52 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -175,6 +175,9 @@ public:
     virtual bool update_profile(RuntimeProfile* profile) { return false; }
     // return rows merged count by iterator
     virtual uint64_t merged_rows() const { return 0; }
+
+    // return if it's an empty iterator
+    virtual bool empty() const { return false; }
 };
 
 } // namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index babc41a10f..2207836650 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -204,6 +204,8 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params,
     _reader_context.need_ordered_result = need_ordered_result;
     _reader_context.use_topn_opt = read_params.use_topn_opt;
     _reader_context.read_orderby_key_reverse = 
read_params.read_orderby_key_reverse;
+    _reader_context.read_orderby_key_limit = 
read_params.read_orderby_key_limit;
+    _reader_context.filter_block_vconjunct_ctx_ptr = 
read_params.filter_block_vconjunct_ctx_ptr;
     _reader_context.return_columns = &_return_columns;
     _reader_context.read_orderby_key_columns =
             _orderby_key_columns.size() > 0 ? &_orderby_key_columns : nullptr;
@@ -418,7 +420,8 @@ Status TabletReader::_init_orderby_keys_param(const 
ReaderParams& read_params) {
     }
 
     // UNIQUE_KEYS will compare all keys as before
-    if (_tablet_schema->keys_type() == DUP_KEYS) {
+    if (_tablet_schema->keys_type() == DUP_KEYS || 
(_tablet_schema->keys_type() == UNIQUE_KEYS &&
+                                                    
_tablet->enable_unique_key_merge_on_write())) {
         // find index in vector _return_columns
         //   for the read_orderby_key_num_prefix_columns orderby keys
         for (uint32_t i = 0; i < 
read_params.read_orderby_key_num_prefix_columns; i++) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index ea74256e1e..b9bafd6fed 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -28,6 +28,7 @@
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
 #include "util/runtime_profile.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 
@@ -120,6 +121,10 @@ public:
         bool read_orderby_key_reverse = false;
         // num of columns for orderby key
         size_t read_orderby_key_num_prefix_columns = 0;
+        // limit of rows for read_orderby_key
+        size_t read_orderby_key_limit = 0;
+        // filter_block arguments
+        vectorized::VExprContext** filter_block_vconjunct_ctx_ptr = nullptr;
 
         // for vertical compaction
         bool is_key_column_group = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 3b45b3cf3f..0852641a03 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -178,9 +178,13 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
             LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
             return Status::Error<ROWSET_READER_INIT>();
         }
+        if (iter->empty()) {
+            continue;
+        }
         seg_iterators.push_back(std::move(iter));
     }
 
+    std::vector<RowwiseIterator*> iterators;
     for (auto& owned_it : seg_iterators) {
         auto st = owned_it->init(_read_options);
         if (!st.ok()) {
@@ -223,6 +227,10 @@ Status BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
 
 Status BetaRowsetReader::next_block(vectorized::Block* block) {
     SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
+    if (_empty) {
+        return Status::Error<END_OF_FILE>();
+    }
+
     do {
         auto s = _iterator->next_batch(block);
         if (!s.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index dea6814558..53bc73c156 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -84,6 +84,8 @@ private:
     SegmentCacheHandle _segment_cache_handle;
 
     StorageReadOptions _read_options;
+
+    bool _empty = false;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index 826f753080..cd52884fc9 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -22,6 +22,7 @@
 #include "olap/olap_common.h"
 #include "runtime/runtime_state.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 
@@ -42,6 +43,10 @@ struct RowsetReaderContext {
     bool read_orderby_key_reverse = false;
     // columns for orderby keys
     std::vector<uint32_t>* read_orderby_key_columns = nullptr;
+    // limit of rows for read_orderby_key
+    size_t read_orderby_key_limit = 0;
+    // filter_block arguments
+    vectorized::VExprContext** filter_block_vconjunct_ctx_ptr = nullptr;
     // projection columns: the set of columns rowset reader should return
     const std::vector<uint32_t>* return_columns = nullptr;
     TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
diff --git a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h 
b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
index 5571ee6756..0bd7014c61 100644
--- a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
@@ -32,6 +32,7 @@ public:
     Status init(const StorageReadOptions& opts) override { return 
Status::OK(); }
     const Schema& schema() const override { return _schema; }
     Status next_batch(vectorized::Block* block) override;
+    bool empty() const override { return true; }
 
 private:
     const Schema& _schema;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 99721e292a..ac1c058f27 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -969,7 +969,13 @@ void SegmentIterator::_vec_init_lazy_materialization() {
     }
 
     // add runtime predicate to _col_predicates
-    if (_opts.use_topn_opt) {
+    // should NOT add for order by key,
+    //  since key is already sorted and topn_next only need first N rows from 
each segment,
+    //  but runtime predicate will filter some rows and read more than N rows.
+    // should add add for order by none-key column, since none-key column is 
not sorted and
+    //  all rows should be read, so runtime predicate will reduce rows for 
topn node
+    if (_opts.use_topn_opt &&
+        !(_opts.read_orderby_key_columns != nullptr && 
!_opts.read_orderby_key_columns->empty())) {
         auto& runtime_predicate =
                 
_opts.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
         _runtime_predicate = runtime_predicate.get_predictate();
diff --git a/be/src/runtime/runtime_predicate.cpp 
b/be/src/runtime/runtime_predicate.cpp
index 0ee4f8915a..437fa49399 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -113,6 +113,11 @@ Status RuntimePredicate::init(const PrimitiveType type) {
 Status RuntimePredicate::update(const Field& value, const String& col_name, 
bool is_reverse) {
     std::unique_lock<std::shared_mutex> wlock(_rwlock);
 
+    // TODO why null
+    if (!_tablet_schema) {
+        return Status::OK();
+    }
+
     bool updated = false;
 
     if (UNLIKELY(_orderby_extrem.is_null())) {
@@ -142,6 +147,8 @@ Status RuntimePredicate::update(const Field& value, const 
String& col_name, bool
     // get value string from _orderby_extrem and push back to condition_values
     condition.condition_values.push_back(_get_value_fn(_orderby_extrem));
 
+    VLOG_DEBUG << "update runtime predicate condition " << condition;
+
     // update _predictate
     _predictate.reset(
             parse_to_predicate(_tablet_schema, condition, 
_predicate_mem_pool.get(), false));
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index b40cdc76b6..b608a6ac49 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -830,8 +830,10 @@ MutableBlock::MutableBlock(const 
std::vector<TupleDescriptor*>& tuple_descs, int
             if (reserve_size != 0) {
                 _columns.back()->reserve(reserve_size);
             }
+            _names.push_back(slot_desc->col_name());
         }
     }
+    initialize_index_by_name();
 }
 
 size_t MutableBlock::rows() const {
@@ -847,12 +849,16 @@ size_t MutableBlock::rows() const {
 void MutableBlock::swap(MutableBlock& another) noexcept {
     _columns.swap(another._columns);
     _data_types.swap(another._data_types);
+    _names.swap(another._names);
+    initialize_index_by_name();
 }
 
 void MutableBlock::swap(MutableBlock&& another) noexcept {
     clear();
     _columns = std::move(another._columns);
     _data_types = std::move(another._data_types);
+    _names = std::move(another._names);
+    initialize_index_by_name();
 }
 
 void MutableBlock::add_row(const Block* block, int row) {
@@ -887,7 +893,7 @@ Block MutableBlock::to_block(int start_column) {
 Block MutableBlock::to_block(int start_column, int end_column) {
     ColumnsWithTypeAndName columns_with_schema;
     for (size_t i = start_column; i < end_column; ++i) {
-        columns_with_schema.emplace_back(std::move(_columns[i]), 
_data_types[i], "");
+        columns_with_schema.emplace_back(std::move(_columns[i]), 
_data_types[i], _names[i]);
     }
     return {columns_with_schema};
 }
@@ -977,4 +983,35 @@ void MutableBlock::clear_column_data() noexcept {
     }
 }
 
+void MutableBlock::initialize_index_by_name() {
+    for (size_t i = 0, size = _names.size(); i < size; ++i) {
+        index_by_name[_names[i]] = i;
+    }
+}
+
+bool MutableBlock::has(const std::string& name) const {
+    return index_by_name.end() != index_by_name.find(name);
+}
+
+size_t MutableBlock::get_position_by_name(const std::string& name) const {
+    auto it = index_by_name.find(name);
+    if (index_by_name.end() == it) {
+        LOG(FATAL) << fmt::format("Not found column {} in block. There are 
only columns: {}", name,
+                                  dump_names());
+    }
+
+    return it->second;
+}
+
+std::string MutableBlock::dump_names() const {
+    std::stringstream out;
+    for (auto it = _names.begin(); it != _names.end(); ++it) {
+        if (it != _names.begin()) {
+            out << ", ";
+        }
+        out << *it;
+    }
+    return out.str();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 959480f479..d5032cd568 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -380,6 +380,10 @@ class MutableBlock {
 private:
     MutableColumns _columns;
     DataTypes _data_types;
+    Names _names;
+
+    using IndexByName = phmap::flat_hash_map<String, size_t>;
+    IndexByName index_by_name;
 
 public:
     static MutableBlock build_mutable_block(Block* block) {
@@ -392,13 +396,23 @@ public:
                  bool igore_trivial_slot = false);
 
     MutableBlock(Block* block)
-            : _columns(block->mutate_columns()), 
_data_types(block->get_data_types()) {}
+            : _columns(block->mutate_columns()),
+              _data_types(block->get_data_types()),
+              _names(block->get_names()) {
+        initialize_index_by_name();
+    }
     MutableBlock(Block&& block)
-            : _columns(block.mutate_columns()), 
_data_types(block.get_data_types()) {}
+            : _columns(block.mutate_columns()),
+              _data_types(block.get_data_types()),
+              _names(block.get_names()) {
+        initialize_index_by_name();
+    }
 
     void operator=(MutableBlock&& m_block) {
         _columns = std::move(m_block._columns);
         _data_types = std::move(m_block._data_types);
+        _names = std::move(m_block._names);
+        initialize_index_by_name();
     }
 
     size_t rows() const;
@@ -439,10 +453,30 @@ public:
         }
         return 0;
     }
+
+    int compare_at(size_t n, size_t m, const std::vector<uint32_t>* 
compare_columns,
+                   const MutableBlock& rhs, int nan_direction_hint) const {
+        DCHECK_GE(columns(), compare_columns->size());
+        DCHECK_GE(rhs.columns(), compare_columns->size());
+
+        DCHECK_LE(n, rows());
+        DCHECK_LE(m, rhs.rows());
+        for (auto i : *compare_columns) {
+            
DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i)));
+            auto res = get_column_by_position(i)->compare_at(n, m, 
*(rhs.get_column_by_position(i)),
+                                                             
nan_direction_hint);
+            if (res) {
+                return res;
+            }
+        }
+        return 0;
+    }
+
     template <typename T>
     void merge(T&& block) {
         if (_columns.size() == 0 && _data_types.size() == 0) {
             _data_types = block.get_data_types();
+            _names = block.get_names();
             _columns.resize(block.columns());
             for (size_t i = 0; i < block.columns(); ++i) {
                 if (block.get_by_position(i).column) {
@@ -453,6 +487,7 @@ public:
                     _columns[i] = _data_types[i]->create_column();
                 }
             }
+            initialize_index_by_name();
         } else {
             DCHECK_EQ(_columns.size(), block.columns());
             for (int i = 0; i < _columns.size(); ++i) {
@@ -495,6 +530,7 @@ public:
     void clear() {
         _columns.clear();
         _data_types.clear();
+        _names.clear();
     }
 
     void clear_column_data() noexcept;
@@ -509,6 +545,18 @@ public:
 
         return res;
     }
+
+    Names& get_names() { return _names; }
+
+    bool has(const std::string& name) const;
+
+    size_t get_position_by_name(const std::string& name) const;
+
+    /** Get a list of column names separated by commas. */
+    std::string dump_names() const;
+
+private:
+    void initialize_index_by_name();
 };
 
 struct IteratorRowRef {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 0af436af00..0bd26eba74 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -338,10 +338,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
             }
             _tablet_reader_params.read_orderby_key_num_prefix_columns =
                     olap_scan_node.sort_info.is_asc_order.size();
+            _tablet_reader_params.read_orderby_key_limit = _limit;
+            _tablet_reader_params.filter_block_vconjunct_ctx_ptr = 
&_vconjunct_ctx;
         }
-    }
 
-    _tablet_reader_params.use_topn_opt = 
((NewOlapScanNode*)_parent)->_olap_scan_node.use_topn_opt;
+        // runtime predicate push down optimization for topn
+        _tablet_reader_params.use_topn_opt =
+                ((NewOlapScanNode*)_parent)->_olap_scan_node.use_topn_opt;
+    }
 
     return Status::OK();
 }
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 51a4457ce5..b872a80b1f 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -79,7 +79,10 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params,
     _reader_context.is_vec = true;
     _reader_context.push_down_agg_type_opt = 
read_params.push_down_agg_type_opt;
     for (auto& rs_reader : rs_readers) {
-        RETURN_NOT_OK(rs_reader->init(&_reader_context));
+        // _vcollect_iter.topn_next() will init rs_reader by itself
+        if (!_vcollect_iter.use_topn_next()) {
+            RETURN_NOT_OK(rs_reader->init(&_reader_context));
+        }
         Status res = _vcollect_iter.add_child(rs_reader);
         if (!res.ok() && !res.is<END_OF_FILE>()) {
             LOG(WARNING) << "failed to add child to iterator, err=" << res;
@@ -91,8 +94,11 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params,
     }
 
     RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
-    auto status = _vcollect_iter.current_row(&_next_row);
-    _eof = status.is<END_OF_FILE>();
+    // _vcollect_iter.topn_next() can not use current_row
+    if (!_vcollect_iter.use_topn_next()) {
+        auto status = _vcollect_iter.current_row(&_next_row);
+        _eof = status.is<END_OF_FILE>();
+    }
 
     return Status::OK();
 }
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 84d323a084..3a666dd61b 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -57,9 +57,23 @@ void VCollectIterator::init(TabletReader* reader, bool 
ori_data_overlapping, boo
         _merge = true;
     }
     _is_reverse = is_reverse;
+    // use topn_next opt only for DUP_KEYS and UNIQUE_KEYS with MOW
+    if (_reader->_reader_context.read_orderby_key_limit > 0 &&
+        (_reader->_tablet->keys_type() == KeysType::DUP_KEYS ||
+         (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+          _reader->_tablet->enable_unique_key_merge_on_write()))) {
+        _topn_limit = _reader->_reader_context.read_orderby_key_limit;
+    } else {
+        _topn_limit = 0;
+    }
 }
 
 Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
+    if (use_topn_next()) {
+        _rs_readers.push_back(rs_reader);
+        return Status::OK();
+    }
+
     std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, 
_reader));
     _children.push_back(child.release());
     return Status::OK();
@@ -69,6 +83,10 @@ Status VCollectIterator::add_child(RowsetReaderSharedPtr 
rs_reader) {
 // status will be used as the base rowset, and the other rowsets will be 
merged first and
 // then merged with the base rowset.
 Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& 
rs_readers) {
+    if (use_topn_next()) {
+        return Status::OK();
+    }
+
     DCHECK(rs_readers.size() == _children.size());
     _skip_same = _reader->_tablet_schema->keys_type() == KeysType::UNIQUE_KEYS;
     if (_children.empty()) {
@@ -204,6 +222,10 @@ Status VCollectIterator::next(IteratorRowRef* ref) {
 }
 
 Status VCollectIterator::next(Block* block) {
+    if (use_topn_next()) {
+        return _topn_next(block);
+    }
+
     if (LIKELY(_inner_iter)) {
         return _inner_iter->next(block);
     } else {
@@ -211,6 +233,161 @@ Status VCollectIterator::next(Block* block) {
     }
 }
 
+Status VCollectIterator::_topn_next(Block* block) {
+    if (_topn_eof) {
+        return Status::Error<END_OF_FILE>();
+    }
+
+    auto cloneBlock = block->clone_empty();
+    MutableBlock mutable_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+
+    size_t first_sort_column_idx = 
(*_reader->_reader_context.read_orderby_key_columns)[0];
+    const std::vector<uint32_t>* sort_columns = 
_reader->_reader_context.read_orderby_key_columns;
+
+    BlockRowPosComparator row_pos_comparator(&mutable_block, sort_columns,
+                                             
_reader->_reader_context.read_orderby_key_reverse);
+    std::multiset<size_t, BlockRowPosComparator, std::allocator<size_t>> 
sorted_row_pos(
+            row_pos_comparator);
+
+    if (_is_reverse) {
+        std::reverse(_rs_readers.begin(), _rs_readers.end());
+    }
+
+    for (auto rs_reader : _rs_readers) {
+        // init will prune segment by _reader_context.conditions and 
_reader_context.runtime_conditions
+        RETURN_NOT_OK(rs_reader->init(&_reader->_reader_context));
+
+        // read _topn_limit rows from this rs
+        size_t read_rows = 0;
+        bool eof = false;
+        while (read_rows < _topn_limit && !eof) {
+            block->clear_column_data();
+            auto res = rs_reader->next_block(block);
+            if (!res.ok()) {
+                if (res.is<END_OF_FILE>()) {
+                    eof = true;
+                    if (block->rows() == 0) {
+                        break;
+                    }
+                } else {
+                    return res;
+                }
+            }
+
+            auto col_name = block->get_names()[first_sort_column_idx];
+
+            // filter block
+            RETURN_IF_ERROR(VExprContext::filter_block(
+                    
*(_reader->_reader_context.filter_block_vconjunct_ctx_ptr), block,
+                    block->columns()));
+
+            // update read rows
+            read_rows += block->rows();
+
+            // insert block rows to mutable_block and adjust sorted_row_pos
+            bool changed = false;
+
+            size_t rows_to_copy = 0;
+            if (sorted_row_pos.empty()) {
+                rows_to_copy = std::min(block->rows(), _topn_limit);
+            } else {
+                // _is_reverse == true  last_row_pos is the pos of smallest row
+                // _is_reverse == false last_row_pos is biggest row
+                size_t last_row_pos = *sorted_row_pos.rbegin();
+
+                // find the how many rows which is less than the last row in 
mutable_block
+                for (size_t i = 0; i < block->rows(); i++) {
+                    // if there is not enough rows in sorted_row_pos, just 
copy new rows
+                    if (sorted_row_pos.size() + rows_to_copy < _topn_limit) {
+                        rows_to_copy++;
+                        continue;
+                    }
+
+                    DCHECK_GE(block->columns(), sort_columns->size());
+                    DCHECK_GE(mutable_block.columns(), sort_columns->size());
+
+                    int res = 0;
+                    for (auto j : *sort_columns) {
+                        DCHECK(block->get_by_position(j).type->equals(
+                                *mutable_block.get_datatype_by_position(j)));
+                        res = block->get_by_position(j).column->compare_at(
+                                i, last_row_pos, 
*(mutable_block.get_column_by_position(j)), 0);
+                        if (res) {
+                            break;
+                        }
+                    }
+
+                    // only copy needed rows
+                    // _is_reverse == true  > smallest is ok
+                    // _is_reverse == false < biggest is ok
+                    if ((_is_reverse && res > 0) || (!_is_reverse && res < 0)) 
{
+                        rows_to_copy++;
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            if (rows_to_copy > 0) {
+                // create column that is not in mutable_block but in block
+                for (size_t i = mutable_block.columns(); i < block->columns(); 
++i) {
+                    auto col = block->get_by_position(i).clone_empty();
+                    
mutable_block.mutable_columns().push_back(col.column->assume_mutable());
+                    mutable_block.data_types().push_back(std::move(col.type));
+                    mutable_block.get_names().push_back(std::move(col.name));
+                }
+
+                size_t base = mutable_block.rows();
+                // append block to mutable_block
+                mutable_block.add_rows(block, 0, rows_to_copy);
+                // insert appended rows pos in mutable_block to sorted_row_pos 
and sort it
+                for (size_t i = 0; i < rows_to_copy; i++) {
+                    sorted_row_pos.insert(base + i);
+                    changed = true;
+                }
+            }
+
+            // delete to keep _topn_limit row pos
+            if (sorted_row_pos.size() > _topn_limit) {
+                auto first = sorted_row_pos.begin();
+                for (size_t i = 0; i < _topn_limit; i++) {
+                    first++;
+                }
+                sorted_row_pos.erase(first, sorted_row_pos.end());
+                // TODO: mutable_block should also shrink
+            }
+
+            // update runtime_predicate
+            if (_reader->_reader_context.use_topn_opt && changed &&
+                sorted_row_pos.size() >= _topn_limit) {
+                // get field value from column
+                size_t last_sorted_row = *sorted_row_pos.rbegin();
+                auto col_ptr = 
mutable_block.get_column_by_position(first_sort_column_idx).get();
+                Field new_top;
+                col_ptr->get(last_sorted_row, new_top);
+
+                // update orderby_extrems in query global context
+                auto query_ctx = 
_reader->_reader_context.runtime_state->get_query_fragments_ctx();
+                RETURN_IF_ERROR(
+                        query_ctx->get_runtime_predicate().update(new_top, 
col_name, _is_reverse));
+            }
+        } // end of while (read_rows < _topn_limit && !eof)
+    }     // end of for (auto rs_reader : _rs_readers)
+
+    // copy result_block to block
+    // TODO only copy limit rows
+    *block = mutable_block.to_block();
+
+    _topn_eof = true;
+    return block->rows() > 0 ? Status::OK() : Status::Error<END_OF_FILE>();
+}
+
+bool VCollectIterator::BlockRowPosComparator::operator()(const size_t& lpos,
+                                                         const size_t& rpos) 
const {
+    int ret = _mutable_block->compare_at(lpos, rpos, _compare_columns, 
*_mutable_block, 0);
+    return _is_reverse ? ret > 0 : ret < 0;
+}
+
 VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr 
rs_reader,
                                                  TabletReader* reader)
         : LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) {
diff --git a/be/src/vec/olap/vcollect_iterator.h 
b/be/src/vec/olap/vcollect_iterator.h
index cf600e53ef..218c92e99e 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -71,7 +71,29 @@ public:
         return false;
     }
 
+    inline bool use_topn_next() const { return _topn_limit > 0; }
+
 private:
+    // next for topn query
+    Status _topn_next(Block* block);
+
+    class BlockRowPosComparator {
+    public:
+        BlockRowPosComparator(MutableBlock* mutable_block,
+                              const std::vector<uint32_t>* compare_columns, 
bool is_reverse)
+                : _mutable_block(mutable_block),
+                  _compare_columns(compare_columns),
+                  _is_reverse(is_reverse) {}
+
+        bool operator()(const size_t& lpos, const size_t& rpos) const;
+
+    private:
+        const MutableBlock* _mutable_block = nullptr;
+        const std::vector<uint32_t>* _compare_columns;
+        // reverse the compare order
+        const bool _is_reverse = false;
+    };
+
     // This interface is the actual implementation of the new version of 
iterator.
     // It currently contains two implementations, one is Level0Iterator,
     // which only reads data from the rowset reader, and the other is 
Level1Iterator,
@@ -291,6 +313,11 @@ private:
     bool _merge = true;
     // reverse the compare order
     bool _is_reverse = false;
+    // for topn next
+    size_t _topn_limit = 0;
+    bool _topn_eof = false;
+    std::vector<RowsetReaderSharedPtr> _rs_readers;
+
     // Hold reader point to access read params, such as fetch conditions.
     TabletReader* _reader = nullptr;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index f30b76b9bf..75c69af4f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -461,6 +461,16 @@ public class OlapScanNode extends ScanNode {
         return olapTable;
     }
 
+    public boolean isDupKeysOrMergeOnWrite() {
+        if (olapTable.getKeysType() == KeysType.DUP_KEYS
+                || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
+                        && olapTable.getEnableUniqueKeyMergeOnWrite())) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     @Override
     protected String debugString() {
         MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 0cce74f57e..21d5b18dbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -196,7 +196,7 @@ public class OriginalPlanner extends Planner {
 
         // check and set flag for topn detail query opt
         if (VectorizedUtil.isVectorized()) {
-            checkTopnOpt(singleNodePlan);
+            checkAndSetTopnOpt(singleNodePlan);
         }
 
         if (queryOptions.num_nodes == 1 || queryStmt.isPointQuery()) {
@@ -443,7 +443,7 @@ public class OriginalPlanner extends Planner {
      * 2. limit > 0
      * 3. first expression of order by is a table column
      */
-    private void checkTopnOpt(PlanNode node) {
+    private void checkAndSetTopnOpt(PlanNode node) {
         if (node instanceof SortNode && node.getChildren().size() == 1) {
             SortNode sortNode = (SortNode) node;
             PlanNode child = sortNode.getChild(0);
@@ -453,8 +453,10 @@ public class OriginalPlanner extends Planner {
                 if (firstSortExpr instanceof SlotRef && 
!firstSortExpr.getType().isStringType()
                         && !firstSortExpr.getType().isFloatingPointType()) {
                     OlapScanNode scanNode = (OlapScanNode) child;
-                    sortNode.setUseTopnOpt(true);
-                    scanNode.setUseTopnOpt(true);
+                    if (scanNode.isDupKeysOrMergeOnWrite()) {
+                        sortNode.setUseTopnOpt(true);
+                        scanNode.setUseTopnOpt(true);
+                    }
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to