This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 737c73dcf0 [Improvement](topn) order by key topn query optimization
(#15663)
737c73dcf0 is described below
commit 737c73dcf015d21ab0f54b4c4ddb1b59eaa4466d
Author: Kang <[email protected]>
AuthorDate: Mon Feb 6 15:36:05 2023 +0800
[Improvement](topn) order by key topn query optimization (#15663)
---
be/src/olap/iterators.h | 3 +
be/src/olap/reader.cpp | 5 +-
be/src/olap/reader.h | 5 +
be/src/olap/rowset/beta_rowset_reader.cpp | 8 +
be/src/olap/rowset/beta_rowset_reader.h | 2 +
be/src/olap/rowset/rowset_reader_context.h | 5 +
.../rowset/segment_v2/empty_segment_iterator.h | 1 +
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 8 +-
be/src/runtime/runtime_predicate.cpp | 7 +
be/src/vec/core/block.cpp | 39 ++++-
be/src/vec/core/block.h | 52 +++++-
be/src/vec/exec/scan/new_olap_scanner.cpp | 8 +-
be/src/vec/olap/block_reader.cpp | 12 +-
be/src/vec/olap/vcollect_iterator.cpp | 177 +++++++++++++++++++++
be/src/vec/olap/vcollect_iterator.h | 27 ++++
.../org/apache/doris/planner/OlapScanNode.java | 10 ++
.../org/apache/doris/planner/OriginalPlanner.java | 10 +-
17 files changed, 365 insertions(+), 14 deletions(-)
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 36f765ac88..61ce931e52 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -175,6 +175,9 @@ public:
virtual bool update_profile(RuntimeProfile* profile) { return false; }
// return rows merged count by iterator
virtual uint64_t merged_rows() const { return 0; }
+
+ // return if it's an empty iterator
+ virtual bool empty() const { return false; }
};
} // namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index babc41a10f..2207836650 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -204,6 +204,8 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params,
_reader_context.need_ordered_result = need_ordered_result;
_reader_context.use_topn_opt = read_params.use_topn_opt;
_reader_context.read_orderby_key_reverse =
read_params.read_orderby_key_reverse;
+ _reader_context.read_orderby_key_limit =
read_params.read_orderby_key_limit;
+ _reader_context.filter_block_vconjunct_ctx_ptr =
read_params.filter_block_vconjunct_ctx_ptr;
_reader_context.return_columns = &_return_columns;
_reader_context.read_orderby_key_columns =
_orderby_key_columns.size() > 0 ? &_orderby_key_columns : nullptr;
@@ -418,7 +420,8 @@ Status TabletReader::_init_orderby_keys_param(const
ReaderParams& read_params) {
}
// UNIQUE_KEYS will compare all keys as before
- if (_tablet_schema->keys_type() == DUP_KEYS) {
+ if (_tablet_schema->keys_type() == DUP_KEYS ||
(_tablet_schema->keys_type() == UNIQUE_KEYS &&
+
_tablet->enable_unique_key_merge_on_write())) {
// find index in vector _return_columns
// for the read_orderby_key_num_prefix_columns orderby keys
for (uint32_t i = 0; i <
read_params.read_orderby_key_num_prefix_columns; i++) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index ea74256e1e..b9bafd6fed 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -28,6 +28,7 @@
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "util/runtime_profile.h"
+#include "vec/exprs/vexpr_context.h"
namespace doris {
@@ -120,6 +121,10 @@ public:
bool read_orderby_key_reverse = false;
// num of columns for orderby key
size_t read_orderby_key_num_prefix_columns = 0;
+ // limit of rows for read_orderby_key
+ size_t read_orderby_key_limit = 0;
+ // filter_block arguments
+ vectorized::VExprContext** filter_block_vconjunct_ctx_ptr = nullptr;
// for vertical compaction
bool is_key_column_group = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 3b45b3cf3f..0852641a03 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -178,9 +178,13 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() <<
"]: " << s.to_string();
return Status::Error<ROWSET_READER_INIT>();
}
+ if (iter->empty()) {
+ continue;
+ }
seg_iterators.push_back(std::move(iter));
}
+ std::vector<RowwiseIterator*> iterators;
for (auto& owned_it : seg_iterators) {
auto st = owned_it->init(_read_options);
if (!st.ok()) {
@@ -223,6 +227,10 @@ Status BetaRowsetReader::init(RowsetReaderContext*
read_context) {
Status BetaRowsetReader::next_block(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
+ if (_empty) {
+ return Status::Error<END_OF_FILE>();
+ }
+
do {
auto s = _iterator->next_batch(block);
if (!s.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index dea6814558..53bc73c156 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -84,6 +84,8 @@ private:
SegmentCacheHandle _segment_cache_handle;
StorageReadOptions _read_options;
+
+ bool _empty = false;
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index 826f753080..cd52884fc9 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -22,6 +22,7 @@
#include "olap/olap_common.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
namespace doris {
@@ -42,6 +43,10 @@ struct RowsetReaderContext {
bool read_orderby_key_reverse = false;
// columns for orderby keys
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
+ // limit of rows for read_orderby_key
+ size_t read_orderby_key_limit = 0;
+ // filter_block arguments
+ vectorized::VExprContext** filter_block_vconjunct_ctx_ptr = nullptr;
// projection columns: the set of columns rowset reader should return
const std::vector<uint32_t>* return_columns = nullptr;
TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
diff --git a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
index 5571ee6756..0bd7014c61 100644
--- a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
@@ -32,6 +32,7 @@ public:
Status init(const StorageReadOptions& opts) override { return
Status::OK(); }
const Schema& schema() const override { return _schema; }
Status next_batch(vectorized::Block* block) override;
+ bool empty() const override { return true; }
private:
const Schema& _schema;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 99721e292a..ac1c058f27 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -969,7 +969,13 @@ void SegmentIterator::_vec_init_lazy_materialization() {
}
// add runtime predicate to _col_predicates
- if (_opts.use_topn_opt) {
+ // should NOT add for order by key,
+ // since key is already sorted and topn_next only need first N rows from
each segment,
+ // but runtime predicate will filter some rows and read more than N rows.
+ // should add add for order by none-key column, since none-key column is
not sorted and
+ // all rows should be read, so runtime predicate will reduce rows for
topn node
+ if (_opts.use_topn_opt &&
+ !(_opts.read_orderby_key_columns != nullptr &&
!_opts.read_orderby_key_columns->empty())) {
auto& runtime_predicate =
_opts.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
_runtime_predicate = runtime_predicate.get_predictate();
diff --git a/be/src/runtime/runtime_predicate.cpp
b/be/src/runtime/runtime_predicate.cpp
index 0ee4f8915a..437fa49399 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -113,6 +113,11 @@ Status RuntimePredicate::init(const PrimitiveType type) {
Status RuntimePredicate::update(const Field& value, const String& col_name,
bool is_reverse) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ // TODO why null
+ if (!_tablet_schema) {
+ return Status::OK();
+ }
+
bool updated = false;
if (UNLIKELY(_orderby_extrem.is_null())) {
@@ -142,6 +147,8 @@ Status RuntimePredicate::update(const Field& value, const
String& col_name, bool
// get value string from _orderby_extrem and push back to condition_values
condition.condition_values.push_back(_get_value_fn(_orderby_extrem));
+ VLOG_DEBUG << "update runtime predicate condition " << condition;
+
// update _predictate
_predictate.reset(
parse_to_predicate(_tablet_schema, condition,
_predicate_mem_pool.get(), false));
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index b40cdc76b6..b608a6ac49 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -830,8 +830,10 @@ MutableBlock::MutableBlock(const
std::vector<TupleDescriptor*>& tuple_descs, int
if (reserve_size != 0) {
_columns.back()->reserve(reserve_size);
}
+ _names.push_back(slot_desc->col_name());
}
}
+ initialize_index_by_name();
}
size_t MutableBlock::rows() const {
@@ -847,12 +849,16 @@ size_t MutableBlock::rows() const {
void MutableBlock::swap(MutableBlock& another) noexcept {
_columns.swap(another._columns);
_data_types.swap(another._data_types);
+ _names.swap(another._names);
+ initialize_index_by_name();
}
void MutableBlock::swap(MutableBlock&& another) noexcept {
clear();
_columns = std::move(another._columns);
_data_types = std::move(another._data_types);
+ _names = std::move(another._names);
+ initialize_index_by_name();
}
void MutableBlock::add_row(const Block* block, int row) {
@@ -887,7 +893,7 @@ Block MutableBlock::to_block(int start_column) {
Block MutableBlock::to_block(int start_column, int end_column) {
ColumnsWithTypeAndName columns_with_schema;
for (size_t i = start_column; i < end_column; ++i) {
- columns_with_schema.emplace_back(std::move(_columns[i]),
_data_types[i], "");
+ columns_with_schema.emplace_back(std::move(_columns[i]),
_data_types[i], _names[i]);
}
return {columns_with_schema};
}
@@ -977,4 +983,35 @@ void MutableBlock::clear_column_data() noexcept {
}
}
+void MutableBlock::initialize_index_by_name() {
+ for (size_t i = 0, size = _names.size(); i < size; ++i) {
+ index_by_name[_names[i]] = i;
+ }
+}
+
+bool MutableBlock::has(const std::string& name) const {
+ return index_by_name.end() != index_by_name.find(name);
+}
+
+size_t MutableBlock::get_position_by_name(const std::string& name) const {
+ auto it = index_by_name.find(name);
+ if (index_by_name.end() == it) {
+ LOG(FATAL) << fmt::format("Not found column {} in block. There are
only columns: {}", name,
+ dump_names());
+ }
+
+ return it->second;
+}
+
+std::string MutableBlock::dump_names() const {
+ std::stringstream out;
+ for (auto it = _names.begin(); it != _names.end(); ++it) {
+ if (it != _names.begin()) {
+ out << ", ";
+ }
+ out << *it;
+ }
+ return out.str();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 959480f479..d5032cd568 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -380,6 +380,10 @@ class MutableBlock {
private:
MutableColumns _columns;
DataTypes _data_types;
+ Names _names;
+
+ using IndexByName = phmap::flat_hash_map<String, size_t>;
+ IndexByName index_by_name;
public:
static MutableBlock build_mutable_block(Block* block) {
@@ -392,13 +396,23 @@ public:
bool igore_trivial_slot = false);
MutableBlock(Block* block)
- : _columns(block->mutate_columns()),
_data_types(block->get_data_types()) {}
+ : _columns(block->mutate_columns()),
+ _data_types(block->get_data_types()),
+ _names(block->get_names()) {
+ initialize_index_by_name();
+ }
MutableBlock(Block&& block)
- : _columns(block.mutate_columns()),
_data_types(block.get_data_types()) {}
+ : _columns(block.mutate_columns()),
+ _data_types(block.get_data_types()),
+ _names(block.get_names()) {
+ initialize_index_by_name();
+ }
void operator=(MutableBlock&& m_block) {
_columns = std::move(m_block._columns);
_data_types = std::move(m_block._data_types);
+ _names = std::move(m_block._names);
+ initialize_index_by_name();
}
size_t rows() const;
@@ -439,10 +453,30 @@ public:
}
return 0;
}
+
+ int compare_at(size_t n, size_t m, const std::vector<uint32_t>*
compare_columns,
+ const MutableBlock& rhs, int nan_direction_hint) const {
+ DCHECK_GE(columns(), compare_columns->size());
+ DCHECK_GE(rhs.columns(), compare_columns->size());
+
+ DCHECK_LE(n, rows());
+ DCHECK_LE(m, rhs.rows());
+ for (auto i : *compare_columns) {
+
DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i)));
+ auto res = get_column_by_position(i)->compare_at(n, m,
*(rhs.get_column_by_position(i)),
+
nan_direction_hint);
+ if (res) {
+ return res;
+ }
+ }
+ return 0;
+ }
+
template <typename T>
void merge(T&& block) {
if (_columns.size() == 0 && _data_types.size() == 0) {
_data_types = block.get_data_types();
+ _names = block.get_names();
_columns.resize(block.columns());
for (size_t i = 0; i < block.columns(); ++i) {
if (block.get_by_position(i).column) {
@@ -453,6 +487,7 @@ public:
_columns[i] = _data_types[i]->create_column();
}
}
+ initialize_index_by_name();
} else {
DCHECK_EQ(_columns.size(), block.columns());
for (int i = 0; i < _columns.size(); ++i) {
@@ -495,6 +530,7 @@ public:
void clear() {
_columns.clear();
_data_types.clear();
+ _names.clear();
}
void clear_column_data() noexcept;
@@ -509,6 +545,18 @@ public:
return res;
}
+
+ Names& get_names() { return _names; }
+
+ bool has(const std::string& name) const;
+
+ size_t get_position_by_name(const std::string& name) const;
+
+ /** Get a list of column names separated by commas. */
+ std::string dump_names() const;
+
+private:
+ void initialize_index_by_name();
};
struct IteratorRowRef {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 0af436af00..0bd26eba74 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -338,10 +338,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
}
_tablet_reader_params.read_orderby_key_num_prefix_columns =
olap_scan_node.sort_info.is_asc_order.size();
+ _tablet_reader_params.read_orderby_key_limit = _limit;
+ _tablet_reader_params.filter_block_vconjunct_ctx_ptr =
&_vconjunct_ctx;
}
- }
- _tablet_reader_params.use_topn_opt =
((NewOlapScanNode*)_parent)->_olap_scan_node.use_topn_opt;
+ // runtime predicate push down optimization for topn
+ _tablet_reader_params.use_topn_opt =
+ ((NewOlapScanNode*)_parent)->_olap_scan_node.use_topn_opt;
+ }
return Status::OK();
}
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 51a4457ce5..b872a80b1f 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -79,7 +79,10 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params,
_reader_context.is_vec = true;
_reader_context.push_down_agg_type_opt =
read_params.push_down_agg_type_opt;
for (auto& rs_reader : rs_readers) {
- RETURN_NOT_OK(rs_reader->init(&_reader_context));
+ // _vcollect_iter.topn_next() will init rs_reader by itself
+ if (!_vcollect_iter.use_topn_next()) {
+ RETURN_NOT_OK(rs_reader->init(&_reader_context));
+ }
Status res = _vcollect_iter.add_child(rs_reader);
if (!res.ok() && !res.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to add child to iterator, err=" << res;
@@ -91,8 +94,11 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params,
}
RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
- auto status = _vcollect_iter.current_row(&_next_row);
- _eof = status.is<END_OF_FILE>();
+ // _vcollect_iter.topn_next() can not use current_row
+ if (!_vcollect_iter.use_topn_next()) {
+ auto status = _vcollect_iter.current_row(&_next_row);
+ _eof = status.is<END_OF_FILE>();
+ }
return Status::OK();
}
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 84d323a084..3a666dd61b 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -57,9 +57,23 @@ void VCollectIterator::init(TabletReader* reader, bool
ori_data_overlapping, boo
_merge = true;
}
_is_reverse = is_reverse;
+ // use topn_next opt only for DUP_KEYS and UNIQUE_KEYS with MOW
+ if (_reader->_reader_context.read_orderby_key_limit > 0 &&
+ (_reader->_tablet->keys_type() == KeysType::DUP_KEYS ||
+ (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+ _reader->_tablet->enable_unique_key_merge_on_write()))) {
+ _topn_limit = _reader->_reader_context.read_orderby_key_limit;
+ } else {
+ _topn_limit = 0;
+ }
}
Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
+ if (use_topn_next()) {
+ _rs_readers.push_back(rs_reader);
+ return Status::OK();
+ }
+
std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader,
_reader));
_children.push_back(child.release());
return Status::OK();
@@ -69,6 +83,10 @@ Status VCollectIterator::add_child(RowsetReaderSharedPtr
rs_reader) {
// status will be used as the base rowset, and the other rowsets will be
merged first and
// then merged with the base rowset.
Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>&
rs_readers) {
+ if (use_topn_next()) {
+ return Status::OK();
+ }
+
DCHECK(rs_readers.size() == _children.size());
_skip_same = _reader->_tablet_schema->keys_type() == KeysType::UNIQUE_KEYS;
if (_children.empty()) {
@@ -204,6 +222,10 @@ Status VCollectIterator::next(IteratorRowRef* ref) {
}
Status VCollectIterator::next(Block* block) {
+ if (use_topn_next()) {
+ return _topn_next(block);
+ }
+
if (LIKELY(_inner_iter)) {
return _inner_iter->next(block);
} else {
@@ -211,6 +233,161 @@ Status VCollectIterator::next(Block* block) {
}
}
+Status VCollectIterator::_topn_next(Block* block) {
+ if (_topn_eof) {
+ return Status::Error<END_OF_FILE>();
+ }
+
+ auto cloneBlock = block->clone_empty();
+ MutableBlock mutable_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+
+ size_t first_sort_column_idx =
(*_reader->_reader_context.read_orderby_key_columns)[0];
+ const std::vector<uint32_t>* sort_columns =
_reader->_reader_context.read_orderby_key_columns;
+
+ BlockRowPosComparator row_pos_comparator(&mutable_block, sort_columns,
+
_reader->_reader_context.read_orderby_key_reverse);
+ std::multiset<size_t, BlockRowPosComparator, std::allocator<size_t>>
sorted_row_pos(
+ row_pos_comparator);
+
+ if (_is_reverse) {
+ std::reverse(_rs_readers.begin(), _rs_readers.end());
+ }
+
+ for (auto rs_reader : _rs_readers) {
+ // init will prune segment by _reader_context.conditions and
_reader_context.runtime_conditions
+ RETURN_NOT_OK(rs_reader->init(&_reader->_reader_context));
+
+ // read _topn_limit rows from this rs
+ size_t read_rows = 0;
+ bool eof = false;
+ while (read_rows < _topn_limit && !eof) {
+ block->clear_column_data();
+ auto res = rs_reader->next_block(block);
+ if (!res.ok()) {
+ if (res.is<END_OF_FILE>()) {
+ eof = true;
+ if (block->rows() == 0) {
+ break;
+ }
+ } else {
+ return res;
+ }
+ }
+
+ auto col_name = block->get_names()[first_sort_column_idx];
+
+ // filter block
+ RETURN_IF_ERROR(VExprContext::filter_block(
+
*(_reader->_reader_context.filter_block_vconjunct_ctx_ptr), block,
+ block->columns()));
+
+ // update read rows
+ read_rows += block->rows();
+
+ // insert block rows to mutable_block and adjust sorted_row_pos
+ bool changed = false;
+
+ size_t rows_to_copy = 0;
+ if (sorted_row_pos.empty()) {
+ rows_to_copy = std::min(block->rows(), _topn_limit);
+ } else {
+ // _is_reverse == true last_row_pos is the pos of smallest row
+ // _is_reverse == false last_row_pos is biggest row
+ size_t last_row_pos = *sorted_row_pos.rbegin();
+
+ // find the how many rows which is less than the last row in
mutable_block
+ for (size_t i = 0; i < block->rows(); i++) {
+ // if there is not enough rows in sorted_row_pos, just
copy new rows
+ if (sorted_row_pos.size() + rows_to_copy < _topn_limit) {
+ rows_to_copy++;
+ continue;
+ }
+
+ DCHECK_GE(block->columns(), sort_columns->size());
+ DCHECK_GE(mutable_block.columns(), sort_columns->size());
+
+ int res = 0;
+ for (auto j : *sort_columns) {
+ DCHECK(block->get_by_position(j).type->equals(
+ *mutable_block.get_datatype_by_position(j)));
+ res = block->get_by_position(j).column->compare_at(
+ i, last_row_pos,
*(mutable_block.get_column_by_position(j)), 0);
+ if (res) {
+ break;
+ }
+ }
+
+ // only copy needed rows
+ // _is_reverse == true > smallest is ok
+ // _is_reverse == false < biggest is ok
+ if ((_is_reverse && res > 0) || (!_is_reverse && res < 0))
{
+ rows_to_copy++;
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (rows_to_copy > 0) {
+ // create column that is not in mutable_block but in block
+ for (size_t i = mutable_block.columns(); i < block->columns();
++i) {
+ auto col = block->get_by_position(i).clone_empty();
+
mutable_block.mutable_columns().push_back(col.column->assume_mutable());
+ mutable_block.data_types().push_back(std::move(col.type));
+ mutable_block.get_names().push_back(std::move(col.name));
+ }
+
+ size_t base = mutable_block.rows();
+ // append block to mutable_block
+ mutable_block.add_rows(block, 0, rows_to_copy);
+ // insert appended rows pos in mutable_block to sorted_row_pos
and sort it
+ for (size_t i = 0; i < rows_to_copy; i++) {
+ sorted_row_pos.insert(base + i);
+ changed = true;
+ }
+ }
+
+ // delete to keep _topn_limit row pos
+ if (sorted_row_pos.size() > _topn_limit) {
+ auto first = sorted_row_pos.begin();
+ for (size_t i = 0; i < _topn_limit; i++) {
+ first++;
+ }
+ sorted_row_pos.erase(first, sorted_row_pos.end());
+ // TODO: mutable_block should also shrink
+ }
+
+ // update runtime_predicate
+ if (_reader->_reader_context.use_topn_opt && changed &&
+ sorted_row_pos.size() >= _topn_limit) {
+ // get field value from column
+ size_t last_sorted_row = *sorted_row_pos.rbegin();
+ auto col_ptr =
mutable_block.get_column_by_position(first_sort_column_idx).get();
+ Field new_top;
+ col_ptr->get(last_sorted_row, new_top);
+
+ // update orderby_extrems in query global context
+ auto query_ctx =
_reader->_reader_context.runtime_state->get_query_fragments_ctx();
+ RETURN_IF_ERROR(
+ query_ctx->get_runtime_predicate().update(new_top,
col_name, _is_reverse));
+ }
+ } // end of while (read_rows < _topn_limit && !eof)
+ } // end of for (auto rs_reader : _rs_readers)
+
+ // copy result_block to block
+ // TODO only copy limit rows
+ *block = mutable_block.to_block();
+
+ _topn_eof = true;
+ return block->rows() > 0 ? Status::OK() : Status::Error<END_OF_FILE>();
+}
+
+bool VCollectIterator::BlockRowPosComparator::operator()(const size_t& lpos,
+ const size_t& rpos)
const {
+ int ret = _mutable_block->compare_at(lpos, rpos, _compare_columns,
*_mutable_block, 0);
+ return _is_reverse ? ret > 0 : ret < 0;
+}
+
VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr
rs_reader,
TabletReader* reader)
: LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) {
diff --git a/be/src/vec/olap/vcollect_iterator.h
b/be/src/vec/olap/vcollect_iterator.h
index cf600e53ef..218c92e99e 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -71,7 +71,29 @@ public:
return false;
}
+ inline bool use_topn_next() const { return _topn_limit > 0; }
+
private:
+ // next for topn query
+ Status _topn_next(Block* block);
+
+ class BlockRowPosComparator {
+ public:
+ BlockRowPosComparator(MutableBlock* mutable_block,
+ const std::vector<uint32_t>* compare_columns,
bool is_reverse)
+ : _mutable_block(mutable_block),
+ _compare_columns(compare_columns),
+ _is_reverse(is_reverse) {}
+
+ bool operator()(const size_t& lpos, const size_t& rpos) const;
+
+ private:
+ const MutableBlock* _mutable_block = nullptr;
+ const std::vector<uint32_t>* _compare_columns;
+ // reverse the compare order
+ const bool _is_reverse = false;
+ };
+
// This interface is the actual implementation of the new version of
iterator.
// It currently contains two implementations, one is Level0Iterator,
// which only reads data from the rowset reader, and the other is
Level1Iterator,
@@ -291,6 +313,11 @@ private:
bool _merge = true;
// reverse the compare order
bool _is_reverse = false;
+ // for topn next
+ size_t _topn_limit = 0;
+ bool _topn_eof = false;
+ std::vector<RowsetReaderSharedPtr> _rs_readers;
+
// Hold reader point to access read params, such as fetch conditions.
TabletReader* _reader = nullptr;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index f30b76b9bf..75c69af4f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -461,6 +461,16 @@ public class OlapScanNode extends ScanNode {
return olapTable;
}
+ public boolean isDupKeysOrMergeOnWrite() {
+ if (olapTable.getKeysType() == KeysType.DUP_KEYS
+ || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
+ && olapTable.getEnableUniqueKeyMergeOnWrite())) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@Override
protected String debugString() {
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 0cce74f57e..21d5b18dbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -196,7 +196,7 @@ public class OriginalPlanner extends Planner {
// check and set flag for topn detail query opt
if (VectorizedUtil.isVectorized()) {
- checkTopnOpt(singleNodePlan);
+ checkAndSetTopnOpt(singleNodePlan);
}
if (queryOptions.num_nodes == 1 || queryStmt.isPointQuery()) {
@@ -443,7 +443,7 @@ public class OriginalPlanner extends Planner {
* 2. limit > 0
* 3. first expression of order by is a table column
*/
- private void checkTopnOpt(PlanNode node) {
+ private void checkAndSetTopnOpt(PlanNode node) {
if (node instanceof SortNode && node.getChildren().size() == 1) {
SortNode sortNode = (SortNode) node;
PlanNode child = sortNode.getChild(0);
@@ -453,8 +453,10 @@ public class OriginalPlanner extends Planner {
if (firstSortExpr instanceof SlotRef &&
!firstSortExpr.getType().isStringType()
&& !firstSortExpr.getType().isFloatingPointType()) {
OlapScanNode scanNode = (OlapScanNode) child;
- sortNode.setUseTopnOpt(true);
- scanNode.setUseTopnOpt(true);
+ if (scanNode.isDupKeysOrMergeOnWrite()) {
+ sortNode.setUseTopnOpt(true);
+ scanNode.setUseTopnOpt(true);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]