This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f3ab55d27d [Optimization](index) Optimization for no need to read raw
data for index column that only in where clause (#16569)
f3ab55d27d is described below
commit f3ab55d27d3232daf0d50acd3f06ff13bbdfddd0
Author: YueW <[email protected]>
AuthorDate: Tue Feb 14 00:12:45 2023 +0800
[Optimization](index) Optimization for no need to read raw data for index
column that only in where clause (#16569)
---
be/src/exec/olap_common.h | 10 ++
be/src/olap/column_predicate.h | 1 +
be/src/olap/iterators.h | 1 +
be/src/olap/reader.cpp | 11 +-
be/src/olap/reader.h | 3 +
be/src/olap/rowset/beta_rowset_reader.cpp | 1 +
be/src/olap/rowset/rowset_reader_context.h | 1 +
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 162 ++++++++++++++++++++-
be/src/olap/rowset/segment_v2/segment_iterator.h | 41 +++++-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 9 ++
be/src/vec/exec/scan/new_olap_scan_node.h | 2 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 1 +
be/src/vec/exec/scan/vscan_node.cpp | 21 ++-
be/src/vec/exec/scan/vscan_node.h | 1 +
.../apache/doris/analysis/AggregateInfoBase.java | 5 +
.../java/org/apache/doris/analysis/SortInfo.java | 8 +
.../org/apache/doris/planner/OlapScanNode.java | 9 ++
.../org/apache/doris/planner/OriginalPlanner.java | 71 +++++++++
.../java/org/apache/doris/planner/PlanNode.java | 4 +
.../doris/planner/TableFunctionPlanTest.java | 2 +-
gensrc/thrift/PaloInternalService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 1 +
.../test_add_drop_index_with_delete_data.groovy | 2 +-
23 files changed, 358 insertions(+), 10 deletions(-)
diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index ca3710002d..19548b59f3 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -199,6 +199,7 @@ public:
if (TYPE_MIN != _low_value || FILTER_LARGER_OR_EQUAL != _low_op) {
low.__set_column_name(_column_name);
low.__set_condition_op((_low_op == FILTER_LARGER_OR_EQUAL ?
">=" : ">>"));
+
low.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
low.condition_values.push_back(
cast_to_string<primitive_type, CppType>(_low_value,
_scale));
}
@@ -211,6 +212,7 @@ public:
if (TYPE_MAX != _high_value || FILTER_LESS_OR_EQUAL != _high_op) {
high.__set_column_name(_column_name);
high.__set_condition_op((_high_op == FILTER_LESS_OR_EQUAL ?
"<=" : "<<"));
+
high.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
high.condition_values.push_back(
cast_to_string<primitive_type, CppType>(_high_value,
_scale));
}
@@ -237,6 +239,7 @@ public:
TCondition condition;
condition.__set_column_name(_column_name);
condition.__set_condition_op(is_in ? "*=" : "!*=");
+
condition.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
for (const auto& value : _fixed_values) {
condition.condition_values.push_back(
@@ -333,6 +336,12 @@ public:
_contain_null = contain_null;
}
+ void mark_runtime_filter_predicate(bool is_runtime_filter_predicate) {
+ _marked_runtime_filter_predicate = is_runtime_filter_predicate;
+ }
+
+ bool get_marked_by_runtime_filter() const { return
_marked_runtime_filter_predicate; }
+
int precision() const { return _precision; }
int scale() const { return _scale; }
@@ -413,6 +422,7 @@ private:
// range value except leaf node of and node in compound expr tree
std::set<std::pair<SQLFilterOp, CppType>> _compound_values;
+ bool _marked_runtime_filter_predicate = false;
};
class OlapScanKeys {
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 2af921ddf4..c87adaafb8 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -34,6 +34,7 @@ class Schema;
struct PredicateParams {
std::string value;
+ bool marked_by_runtime_filter = false;
};
enum class PredicateType {
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 61ce931e52..3cc8da46ca 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -119,6 +119,7 @@ public:
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
IOContext io_ctx;
vectorized::VExpr* remaining_vconjunct_root = nullptr;
+ const std::set<int32_t>* output_columns = nullptr;
// runtime state
RuntimeState* runtime_state = nullptr;
RowsetId rowset_id;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 0d116dd0ae..8d59085908 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -227,6 +227,7 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params,
_reader_context.record_rowids = read_params.record_rowids;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.remaining_vconjunct_root =
read_params.remaining_vconjunct_root;
+ _reader_context.output_columns = &read_params.output_columns;
*valid_rs_readers = *rs_readers;
@@ -457,6 +458,7 @@ void TabletReader::_init_conditions_param(const
ReaderParams& read_params) {
// _gen_predicate_result_sign will build predicate result unique
sign with condition value
auto predicate_params = predicate->predicate_params();
predicate_params->value = condition.condition_values[0];
+ predicate_params->marked_by_runtime_filter =
condition.marked_by_runtime_filter;
if (_tablet_schema->column_by_uid(condition_col_uid).aggregation()
!=
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
_value_col_predicates.push_back(predicate);
@@ -476,7 +478,13 @@ void TabletReader::_init_conditions_param(const
ReaderParams& read_params) {
}
for (const auto& filter : read_params.in_filters) {
- _col_predicates.emplace_back(_parse_to_predicate(filter));
+ ColumnPredicate* predicate = _parse_to_predicate(filter);
+ if (predicate != nullptr) {
+ // in_filters from runtime filter predicates which pushed down to
data source.
+ auto predicate_params = predicate->predicate_params();
+ predicate_params->marked_by_runtime_filter = true;
+ }
+ _col_predicates.emplace_back(predicate);
}
// Function filter push down to storage engine
@@ -523,6 +531,7 @@ void
TabletReader::_init_conditions_param_except_leafnode_of_andnode(
parse_to_predicate(_tablet_schema, tmp_cond,
_predicate_mem_pool.get());
if (predicate != nullptr) {
auto predicate_params = predicate->predicate_params();
+ predicate_params->marked_by_runtime_filter =
condition.marked_by_runtime_filter;
predicate_params->value = condition.condition_values[0];
_col_preds_except_leafnode_of_andnode.push_back(predicate);
}
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 3abccc8044..4168341738 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -101,7 +101,10 @@ public:
DeleteBitmap* delete_bitmap {nullptr};
std::vector<RowsetReaderSharedPtr> rs_readers;
+ // return_columns is init from query schema
std::vector<uint32_t> return_columns;
+ // output_columns only contain columns in OrderByExprs and outputExprs
+ std::set<int32_t> output_columns;
RuntimeProfile* profile = nullptr;
RuntimeState* runtime_state = nullptr;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0f978e3c3a..2ba9e2fe85 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -163,6 +163,7 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.read_orderby_key_columns =
read_context->read_orderby_key_columns;
_read_options.io_ctx.reader_type = read_context->reader_type;
_read_options.runtime_state = read_context->runtime_state;
+ _read_options.output_columns = read_context->output_columns;
// load segments
// use cache is true when do vertica compaction
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index cd52884fc9..e342a5a824 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -77,6 +77,7 @@ struct RowsetReaderContext {
bool record_rowids = false;
bool is_vertical_compaction = false;
bool is_key_column_group = false;
+ const std::set<int32_t>* output_columns = nullptr;
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 0f29a4e01a..39ada1e7d5 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -176,7 +176,14 @@ Status SegmentIterator::init(const StorageReadOptions&
opts) {
if (!opts.column_predicates_except_leafnode_of_andnode.empty()) {
_col_preds_except_leafnode_of_andnode =
opts.column_predicates_except_leafnode_of_andnode;
}
+
+ if (opts.output_columns != nullptr) {
+ _output_columns = *(opts.output_columns);
+ }
+
_remaining_vconjunct_root = opts.remaining_vconjunct_root;
+ _column_predicate_info.reset(new ColumnPredicateInfo());
+ _calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root);
_column_predicate_info.reset(new ColumnPredicateInfo());
if (_schema.rowid_col_idx() > 0) {
@@ -419,12 +426,19 @@ Status SegmentIterator::_apply_bitmap_index() {
for (auto pred : _col_predicates) {
int32_t unique_id = _schema.unique_id(pred->column_id());
if (_bitmap_index_iterators.count(unique_id) < 1 ||
- _bitmap_index_iterators[unique_id] == nullptr) {
+ _bitmap_index_iterators[unique_id] == nullptr || pred->type() ==
PredicateType::BF) {
// no bitmap index for this column
remaining_predicates.push_back(pred);
} else {
RETURN_IF_ERROR(pred->evaluate(_bitmap_index_iterators[unique_id],
_segment->num_rows(),
&_row_bitmap));
+
+ auto column_name = _schema.column(pred->column_id())->name();
+ if (_check_column_pred_all_push_down(column_name) &&
+ !pred->predicate_params()->marked_by_runtime_filter) {
+ _need_read_data_indices[unique_id] = false;
+ }
+
if (_row_bitmap.isEmpty()) {
break; // all rows have been pruned, no need to process
further predicates
}
@@ -518,7 +532,8 @@ Status SegmentIterator::_execute_compound_fn(const
std::string& function_name) {
bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
for (auto pred : _col_preds_except_leafnode_of_andnode) {
- if (!_check_apply_by_bitmap_index(pred) &&
!_check_apply_by_inverted_index(pred)) {
+ if (_not_apply_index_pred.count(pred->column_id()) ||
+ (!_check_apply_by_bitmap_index(pred) &&
!_check_apply_by_inverted_index(pred))) {
return false;
}
}
@@ -597,6 +612,7 @@ Status
SegmentIterator::_apply_index_except_leafnode_of_andnode() {
pred->type() != PredicateType::MATCH) ||
res.code() == ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT) {
// downgrade without index query
+ _not_apply_index_pred.insert(pred->column_id());
continue;
}
LOG(WARNING) << "failed to evaluate index"
@@ -610,6 +626,16 @@ Status
SegmentIterator::_apply_index_except_leafnode_of_andnode() {
std::make_pair(pred_result_sign, std::make_pair(true,
bitmap)));
}
+ for (auto pred : _col_preds_except_leafnode_of_andnode) {
+ auto column_name = _schema.column(pred->column_id())->name();
+ if (_remaining_vconjunct_root != nullptr &&
+ _check_column_pred_all_push_down(column_name, true) &&
+ !pred->predicate_params()->marked_by_runtime_filter) {
+ int32_t unique_id = _schema.unique_id(pred->column_id());
+ _need_read_data_indices[unique_id] = false;
+ }
+ }
+
return Status::OK();
}
@@ -645,6 +671,11 @@ bool
SegmentIterator::_is_handle_predicate_by_fulltext(int32_t unique_id) {
std::all_of(predicate_set.begin(), predicate_set.end(), \
[](const ColumnPredicate* p) { return
PredicateTypeTraits::is_range(p->type()); })
+#define all_predicates_are_marked_by_runtime_filter(predicate_set)
\
+ std::all_of(predicate_set.begin(), predicate_set.end(), [](const
ColumnPredicate* p) { \
+ return
const_cast<ColumnPredicate*>(p)->predicate_params()->marked_by_runtime_filter; \
+ })
+
Status SegmentIterator::_apply_inverted_index_on_column_predicate(
ColumnPredicate* pred, std::vector<ColumnPredicate*>&
remaining_predicates,
bool* continue_apply) {
@@ -679,6 +710,12 @@ Status
SegmentIterator::_apply_inverted_index_on_column_predicate(
return res;
}
+ auto column_name = _schema.column(pred->column_id())->name();
+ if (_check_column_pred_all_push_down(column_name) &&
+ !pred->predicate_params()->marked_by_runtime_filter) {
+ _need_read_data_indices[unique_id] = false;
+ }
+
auto pred_type = pred->type();
if (pred_type == PredicateType::MATCH) {
std::string pred_result_sign = _gen_predicate_result_sign(pred);
@@ -721,6 +758,10 @@ Status
SegmentIterator::_apply_inverted_index_on_block_column_predicate(
&output_result);
if (res.ok()) {
+ if (_check_column_pred_all_push_down(column_name) &&
+ !all_predicates_are_marked_by_runtime_filter(predicate_set)) {
+ _need_read_data_indices[unique_id] = false;
+ }
no_need_to_pass_column_predicate_set.insert(predicate_set.begin(),
predicate_set.end());
_row_bitmap &= output_result;
if (_row_bitmap.isEmpty()) {
@@ -742,6 +783,17 @@ Status
SegmentIterator::_apply_inverted_index_on_block_column_predicate(
return Status::OK();
}
+bool SegmentIterator::_need_read_data(ColumnId cid) {
+ int32_t unique_id = _schema.unique_id(cid);
+ if (_need_read_data_indices.count(unique_id) > 0 &&
!_need_read_data_indices[unique_id] &&
+ _output_columns.count(unique_id) < 1) {
+ VLOG_DEBUG << "SegmentIterator no need read data for column: "
+ << _opts.tablet_schema->column_by_uid(unique_id).name();
+ return false;
+ }
+ return true;
+}
+
Status SegmentIterator::_apply_inverted_index() {
SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
size_t input_rows = _row_bitmap.cardinality();
@@ -993,6 +1045,9 @@ Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids,
rowid_t pos) {
for (auto cid : column_ids) {
+ if (!_need_read_data(cid)) {
+ continue;
+ }
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->seek_to_ordinal(pos));
}
return Status::OK();
@@ -1201,11 +1256,33 @@ void SegmentIterator::_vec_init_char_column_id() {
}
}
+bool SegmentIterator::_prune_column(ColumnId cid,
vectorized::MutableColumnPtr& column,
+ bool fill_defaults, size_t
num_of_defaults) {
+ if (_need_read_data(cid)) {
+ return false;
+ }
+ if (!fill_defaults) {
+ return true;
+ }
+ if (column->is_nullable()) {
+ auto nullable_col_ptr =
reinterpret_cast<vectorized::ColumnNullable*>(column.get());
+
nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
+
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
+ } else {
+ // assert(column->is_const());
+ column->insert_many_defaults(num_of_defaults);
+ }
+ return true;
+}
+
Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
vectorized::MutableColumns&
column_block, size_t nrows) {
for (auto cid : column_ids) {
auto& column = column_block[cid];
size_t rows_read = nrows;
+ if (_prune_column(cid, column, true, rows_read)) {
+ continue;
+ }
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->next_batch(&rows_read,
column));
DCHECK_EQ(nrows, rows_read);
}
@@ -1370,7 +1447,8 @@ uint16_t
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
std::vector<rowid_t>&
rowid_vector,
- uint16_t* sel_rowid_idx,
size_t select_size) {
+ uint16_t* sel_rowid_idx,
size_t select_size,
+ vectorized::MutableColumns*
mutable_columns) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
std::vector<rowid_t> rowids(select_size);
for (size_t i = 0; i < select_size; ++i) {
@@ -1378,6 +1456,9 @@ Status
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
}
for (auto cid : read_column_ids) {
+ if (_prune_column(cid, (*mutable_columns)[cid], true, select_size)) {
+ continue;
+ }
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->read_by_rowids(
rowids.data(), select_size, _current_return_columns[cid]));
}
@@ -1493,7 +1574,8 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
// step3: read non_predicate column
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns,
_block_rowids,
- sel_rowid_idx, selected_size));
+ sel_rowid_idx, selected_size,
+ &_current_return_columns));
// step4: output columns
// 4.1 output non-predicate column
@@ -1642,5 +1724,77 @@ Status
SegmentIterator::current_block_row_locations(std::vector<RowLocation>* bl
return Status::OK();
}
+/**
+ * solution 1: where cluase included nodes are all `and` leaf nodes,
+ * predicate pushed down and remove from vconjunct.
+ * for example: where A = 1 and B = 'test' and B like '%he%';
+ * column A : `A = 1` pushed down, this column's predicates all pushed
down,
+ * call _check_column_pred_all_push_down will return true.
+ * column B : `B = 'test'` pushed down, but `B like '%he%'` remain in
vconjunct,
+ * call _check_column_pred_all_push_down will return false.
+ *
+ * solution 2: where cluase included nodes are compound or other complex
conditions,
+ * predicate pushed down but still remain in vconjunct.
+ * for exmple: where (A = 1 and B = 'test') or B = 'hi' or (C like '%ye%' and
C > 'aa');
+ * column A : `A = 1` pushed down, check it applyed by index,
+ * call _check_column_pred_all_push_down will return true.
+ * column B : `B = 'test'`, `B = 'hi'` all pushed down, check them all
applyed by index,
+ * call _check_column_pred_all_push_down will return true.
+ * column C : `C like '%ye%'` not pushed down, `C > 'aa'` pushed down,
only `C > 'aa'` applyed by index,
+ * call _check_column_pred_all_push_down will return false.
+*/
+bool SegmentIterator::_check_column_pred_all_push_down(const std::string&
column_name,
+ bool in_compound) {
+ if (_remaining_vconjunct_root == nullptr) {
+ return true;
+ }
+
+ if (in_compound) {
+ auto preds_in_remaining_vconjuct =
_column_pred_in_remaining_vconjunct[column_name];
+ for (auto pred_info : preds_in_remaining_vconjuct) {
+ auto column_sign = _gen_predicate_result_sign(&pred_info);
+ if (_rowid_result_for_index.count(column_sign) < 1) {
+ return false;
+ }
+ }
+ } else {
+ if (_column_pred_in_remaining_vconjunct[column_name].size() != 0) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void SegmentIterator::_calculate_pred_in_remaining_vconjunct_root(const
vectorized::VExpr* expr) {
+ if (expr == nullptr) {
+ return;
+ }
+
+ auto children = expr->children();
+ for (int i = 0; i < children.size(); ++i) {
+ _calculate_pred_in_remaining_vconjunct_root(children[i]);
+ }
+
+ auto node_type = expr->node_type();
+ if (node_type == TExprNodeType::SLOT_REF) {
+ _column_predicate_info->column_name = expr->expr_name();
+ } else if (_is_literal_node(node_type)) {
+ auto v_literal_expr = static_cast<const
doris::vectorized::VLiteral*>(expr);
+ _column_predicate_info->query_value = v_literal_expr->value();
+ } else {
+ if (node_type == TExprNodeType::MATCH_PRED) {
+ _column_predicate_info->query_op = "match";
+ } else if (node_type != TExprNodeType::COMPOUND_PRED) {
+ _column_predicate_info->query_op = expr->fn().name.function_name;
+ }
+
+ if (!_column_predicate_info->is_empty()) {
+
_column_pred_in_remaining_vconjunct[_column_predicate_info->column_name].push_back(
+ *_column_predicate_info);
+ _column_predicate_info.reset(new ColumnPredicateInfo());
+ }
+ }
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index f8ec46333b..b976fc53c7 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -51,6 +51,32 @@ class ColumnIterator;
struct ColumnPredicateInfo {
ColumnPredicateInfo() = default;
+
+ std::string debug_string() const {
+ std::stringstream ss;
+ ss << "column_name=" << column_name << ", query_op=" << query_op
+ << ", query_value=" << query_value;
+ return ss.str();
+ }
+
+ bool is_empty() const { return column_name.empty() && query_value.empty()
&& query_op.empty(); }
+
+ bool is_equal(const ColumnPredicateInfo& column_pred_info) const {
+ if (column_pred_info.column_name != column_name) {
+ return false;
+ }
+
+ if (column_pred_info.query_value != query_value) {
+ return false;
+ }
+
+ if (column_pred_info.query_op != query_op) {
+ return false;
+ }
+
+ return true;
+ }
+
std::string column_name;
std::string query_value;
std::string query_op;
@@ -167,7 +193,7 @@ private:
void _output_non_pred_columns(vectorized::Block* block);
Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector,
uint16_t* sel_rowid_idx,
- size_t select_size);
+ size_t select_size,
vectorized::MutableColumns* mutable_columns);
template <class Container>
Status _output_column_by_sel_idx(vectorized::Block* block, const
Container& column_ids,
@@ -203,6 +229,14 @@ private:
void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t
select_size,
vectorized::Block* block);
+ bool _need_read_data(ColumnId cid);
+ bool _prune_column(ColumnId cid, vectorized::MutableColumnPtr& column,
bool fill_defaults,
+ size_t num_of_defaults);
+
+ // return true means one column's predicates all pushed down
+ bool _check_column_pred_all_push_down(const std::string& column_name, bool
in_compound = false);
+ void _calculate_pred_in_remaining_vconjunct_root(const vectorized::VExpr*
expr);
+
private:
// todo(wb) remove this method after RowCursor is removed
void _convert_rowcursor_to_short_key(const RowCursor& key, size_t
num_keys) {
@@ -297,6 +331,7 @@ private:
std::vector<ColumnId>
_short_cir_pred_column_ids; // keep columnId of columns for short
circuit predicate evaluation
std::vector<bool> _is_pred_column; // columns hold by segmentIter
+ std::map<uint32_t, bool> _need_read_data_indices;
vectorized::MutableColumns _current_return_columns;
std::vector<ColumnPredicate*> _pre_eval_block_predicate;
std::vector<ColumnPredicate*> _short_cir_eval_predicate;
@@ -320,8 +355,12 @@ private:
doris::vectorized::VExpr* _remaining_vconjunct_root;
std::vector<roaring::Roaring>
_pred_except_leafnode_of_andnode_evaluate_result;
std::unique_ptr<ColumnPredicateInfo> _column_predicate_info;
+ std::unordered_map<std::string, std::vector<ColumnPredicateInfo>>
+ _column_pred_in_remaining_vconjunct;
+ std::set<ColumnId> _not_apply_index_pred;
std::shared_ptr<ColumnPredicate> _runtime_predicate {nullptr};
+ std::set<int32_t> _output_columns;
// row schema of the key to seek
// only used in `_get_row_ranges_by_keys`
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 0285189ef1..2978d806db 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -372,6 +372,15 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
(*_vconjunct_ctx_ptr)->root()->debug_string());
}
+ if (!_olap_scan_node.output_column_unique_ids.empty()) {
+ for (auto uid : _olap_scan_node.output_column_unique_ids) {
+ if (uid < 0) {
+ continue;
+ }
+ _maybe_read_column_ids.emplace(uid);
+ }
+ }
+
// ranges constructed from scan keys
std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index c7a346b6db..b179f31060 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -68,6 +68,8 @@ private:
// _compound_filters store conditions in the one compound relationship in
conjunct expr tree except leaf node of `and` node,
// such as: "(a or b) and (c or d)", conditions for a,b,c,d will be stored
std::vector<TCondition> _compound_filters;
+ // If column id in this set, indicate that we need to read data after
index filtering
+ std::set<int32_t> _maybe_read_column_ids;
private:
std::unique_ptr<RuntimeProfile> _segment_profile;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 8dc2e80e2f..a1d8ed0cea 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -215,6 +215,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.version = Version(0, _version);
_tablet_reader_params.remaining_vconjunct_root =
(_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root();
+ _tablet_reader_params.output_columns =
((NewOlapScanNode*)_parent)->_maybe_read_column_ids;
// Condition
for (auto& filter : filters) {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 53cf4930f5..f151a22014 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -511,6 +511,8 @@ Status VScanNode::_normalize_predicate(VExpr*
conjunct_expr_root, VExpr** output
auto impl = conjunct_expr_root->get_impl();
// If impl is not null, which means this a conjuncts from runtime
filter.
VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) :
conjunct_expr_root;
+ bool is_runtimer_filter_predicate =
+ _rf_vexpr_set.find(conjunct_expr_root) !=
_rf_vexpr_set.end();
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range = nullptr;
PushDownType pdt = PushDownType::UNACCEPTABLE;
@@ -523,6 +525,10 @@ Status VScanNode::_normalize_predicate(VExpr*
conjunct_expr_root, VExpr** output
_is_predicate_acting_on_slot(cur_expr, eq_predicate_checker,
&slot, &range)) {
std::visit(
[&](auto& value_range) {
+ Defer mark_runtime_filter_flag {[&]() {
+ value_range.mark_runtime_filter_predicate(
+ is_runtimer_filter_predicate);
+ }};
RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
cur_expr, *(_vconjunct_ctx_ptr.get()),
slot, value_range,
&pdt));
@@ -555,7 +561,8 @@ Status VScanNode::_normalize_predicate(VExpr*
conjunct_expr_root, VExpr** output
if (pdt == PushDownType::UNACCEPTABLE &&
TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) {
_normalize_compound_predicate(cur_expr,
*(_vconjunct_ctx_ptr.get()), &pdt,
- in_predicate_checker,
eq_predicate_checker);
+ is_runtimer_filter_predicate,
in_predicate_checker,
+ eq_predicate_checker);
*output_expr = conjunct_expr_root; // remaining in conjunct
tree
return Status::OK();
}
@@ -977,6 +984,7 @@ Status VScanNode::_normalize_noneq_binary_predicate(VExpr*
expr, VExprContext* e
Status VScanNode::_normalize_compound_predicate(
vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt,
+ bool is_runtimer_filter_predicate,
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**,
VExpr**)>&
in_predicate_checker,
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**,
VExpr**)>&
@@ -997,6 +1005,10 @@ Status VScanNode::_normalize_compound_predicate(
*range_on_slot; // copy, in order not to affect
the range in the _colname_to_value_range
std::visit(
[&](auto& value_range) {
+ Defer mark_runtime_filter_flag {[&]() {
+ value_range.mark_runtime_filter_predicate(
+ is_runtimer_filter_predicate);
+ }};
_normalize_binary_in_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
},
@@ -1015,6 +1027,10 @@ Status VScanNode::_normalize_compound_predicate(
*range_on_slot; // copy, in order not to affect
the range in the _colname_to_value_range
std::visit(
[&](auto& value_range) {
+ Defer mark_runtime_filter_flag {[&]() {
+ value_range.mark_runtime_filter_predicate(
+ is_runtimer_filter_predicate);
+ }};
_normalize_match_in_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
},
@@ -1023,7 +1039,8 @@ Status VScanNode::_normalize_compound_predicate(
_compound_value_ranges.emplace_back(active_range);
}
} else if (TExprNodeType::COMPOUND_PRED ==
child_expr->node_type()) {
- _normalize_compound_predicate(child_expr, expr_ctx, pdt,
in_predicate_checker,
+ _normalize_compound_predicate(child_expr, expr_ctx, pdt,
+ is_runtimer_filter_predicate,
in_predicate_checker,
eq_predicate_checker);
}
}
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index c8d7269087..476c7c37da 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -340,6 +340,7 @@ private:
Status _normalize_compound_predicate(
vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt,
+ bool is_runtimer_filter_predicate,
const std::function<bool(const std::vector<VExpr*>&, const
VSlotRef**, VExpr**)>&
in_predicate_checker,
const std::function<bool(const std::vector<VExpr*>&, const
VSlotRef**, VExpr**)>&
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
index 2501b308f2..431d5c9cba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
@@ -154,6 +154,11 @@ public abstract class AggregateInfoBase {
Expr expr = exprs.get(i);
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(result);
slotDesc.initFromExpr(expr);
+ if (expr instanceof SlotRef) {
+ if (((SlotRef) expr).getColumn() != null) {
+ slotDesc.setColumn(((SlotRef) expr).getColumn());
+ }
+ }
// Not change the nullable of slot desc when is not grouping set id
if (isGroupingSet && i < aggregateExprStartIndex - 1 && !(expr
instanceof VirtualSlotRef)) {
slotDesc.setIsNullable(true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
index 00b871c3b4..0dc41c037d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
@@ -51,6 +51,7 @@ public class SortInfo {
private static final float SORT_MATERIALIZATION_COST_THRESHOLD =
Expr.FUNCTION_CALL_COST;
private List<Expr> orderingExprs;
+ private List<Expr> origOrderingExprs;
private final List<Boolean> isAscOrder;
// True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
private final List<Boolean> nullsFirstParams;
@@ -122,6 +123,10 @@ public class SortInfo {
return orderingExprs;
}
+ public List<Expr> getOrigOrderingExprs() {
+ return origOrderingExprs;
+ }
+
public List<Boolean> getIsAscOrder() {
return isAscOrder;
}
@@ -261,6 +266,9 @@ public class SortInfo {
}
}
+ // backup before substitute orderingExprs
+ origOrderingExprs = orderingExprs;
+
// The ordering exprs are evaluated against the sort tuple, so they
must reflect the
// materialization decision above.
substituteOrderingExprs(substOrderBy, analyzer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0d1e3044d0..5c37623b2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -155,6 +155,7 @@ public class OlapScanNode extends ScanNode {
private long totalBytes = 0;
private SortInfo sortInfo = null;
+ private HashSet<Integer> outputColumnUniqueIds = new HashSet<>();
// When scan match sort_info, we can push limit into OlapScanNode.
// It's limit for scanner instead of scanNode so we add a new limit.
@@ -813,6 +814,10 @@ public class OlapScanNode extends ScanNode {
LOG.debug("distribution prune cost: {} ms",
(System.currentTimeMillis() - start));
}
+ public void setOutputColumnUniqueIds(HashSet<Integer>
outputColumnUniqueIds) {
+ this.outputColumnUniqueIds = outputColumnUniqueIds;
+ }
+
/**
* First, determine how many rows to sample from each partition according
to the number of partitions.
* Then determine the number of Tablets to be selected for each partition
according to the average number
@@ -1149,6 +1154,10 @@ public class OlapScanNode extends ScanNode {
if (pushDownAggNoGroupingOp != null) {
msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
}
+
+ if (outputColumnUniqueIds != null) {
+ msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
+ }
}
// export some tablets
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 00328d1ef9..f03c9fe1fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -253,6 +254,10 @@ public class OriginalPlanner extends Planner {
pushDownResultFileSink(analyzer);
+ if (VectorizedUtil.isVectorized()) {
+ pushOutColumnUniqueIdsToOlapScan(rootFragment, analyzer);
+ }
+
if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() !=
null) {
@@ -436,6 +441,72 @@ public class OriginalPlanner extends Planner {
}
}
+ /**
+ * outputColumnUniqueIds contain columns in OrderByExprs and outputExprs,
+ * push output column unique id set to olap scan.
+ *
+ * when query to storage layer, there are need read raw data
+ * for columns which in outputColumnUniqueIds
+ *
+ * for example:
+ * select A from tb where B = 1 and C > 'hello' order by B;
+ *
+ * column unique id for `A` and `B` will put into outputColumnUniqueIds.
+ *
+ */
+ private void pushOutColumnUniqueIdsToOlapScan(PlanFragment rootFragment,
Analyzer analyzer) {
+ HashSet<Integer> outputColumnUniqueIds = new HashSet<>();
+ ArrayList<Expr> outputExprs = rootFragment.getOutputExprs();
+ for (Expr expr : outputExprs) {
+ if (expr instanceof SlotRef) {
+ if (((SlotRef) expr).getColumn() != null) {
+ outputColumnUniqueIds.add(((SlotRef)
expr).getColumn().getUniqueId());
+ }
+ }
+ }
+
+ for (PlanFragment fragment : fragments) {
+ PlanNode node = fragment.getPlanRoot();
+ PlanNode parent = null;
+ while (node.getChildren().size() != 0) {
+ for (PlanNode childNode : node.getChildren()) {
+ List<SlotId> outputSlotIds = childNode.getOutputSlotIds();
+ if (outputSlotIds != null) {
+ for (SlotId sid : outputSlotIds) {
+ SlotDescriptor slotDesc =
analyzer.getSlotDesc(sid);
+ outputColumnUniqueIds.add(slotDesc.getUniqueId());
+ }
+ }
+ }
+ // OlapScanNode is the last node.
+ // So, just get the two node and check if they are SortNode
and OlapScan.
+ parent = node;
+ node = node.getChildren().get(0);
+ }
+
+ if (parent instanceof SortNode) {
+ SortNode sortNode = (SortNode) parent;
+ List<Expr> orderingExprs =
sortNode.getSortInfo().getOrigOrderingExprs();
+ if (orderingExprs != null) {
+ for (Expr expr : orderingExprs) {
+ if (expr instanceof SlotRef) {
+ if (((SlotRef) expr).getColumn() != null) {
+ outputColumnUniqueIds.add(((SlotRef)
expr).getColumn().getUniqueId());
+ }
+ }
+ }
+ }
+ }
+
+ if (!(node instanceof OlapScanNode)) {
+ continue;
+ }
+
+ OlapScanNode scanNode = (OlapScanNode) node;
+ scanNode.setOutputColumnUniqueIds(outputColumnUniqueIds);
+ }
+ }
+
/**
* optimize for topn query like: SELECT * FROM t1 WHERE a>100 ORDER BY b,c
LIMIT 100
* the pre-requirement is as follows:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 4c6cf88fad..86c0520b27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1130,4 +1130,8 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
public List<Expr> getProjectList() {
return projectList;
}
+
+ public List<SlotId> getOutputSlotIds() {
+ return outputSlotIds;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
index a621994b34..244b7c9288 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
@@ -409,7 +409,7 @@ public class TableFunctionPlanTest {
"SlotDescriptor{id=1,col=k2,colUniqueId=1,type=VARCHAR(1)"
));
Assert.assertTrue(formatString.contains(
- "SlotDescriptor{id=2,col=null,colUniqueId=null,type=INT"
+ "SlotDescriptor{id=2,col=k1,colUniqueId=0,type=INT"
));
Assert.assertTrue(formatString.contains(
"SlotDescriptor{id=3,col=null,colUniqueId=null,type=VARCHAR(*)"
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 59945cb932..52404f1615 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -545,6 +545,7 @@ struct TCondition {
// In delete condition, the different column may have same column name,
need
// using unique id to distinguish them
4: optional i32 column_unique_id
+ 5: optional bool marked_by_runtime_filter = false
}
struct TExportStatusResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index e7853a1bc7..1eb2372977 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -575,6 +575,7 @@ struct TOlapScanNode {
12: optional TPushAggOp push_down_agg_type_opt
13: optional bool use_topn_opt
14: optional list<Descriptors.TOlapTableIndex> indexes_desc
+ 15: optional set<i32> output_column_unique_ids
}
struct TEqJoinCondition {
diff --git
a/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
b/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
index ebf0a7f4b9..365811f3a6 100644
---
a/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
+++
b/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
@@ -35,7 +35,7 @@ suite("test_add_drop_index_with_delete_data",
"inverted_index"){
assertTrue(useTime <= OpTimeout)
}
- def indexTbName1 = "test_add_drop_inverted_index2"
+ def indexTbName1 = "test_add_drop_inverted_index3"
sql "DROP TABLE IF EXISTS ${indexTbName1}"
// create 1 replica table
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]