This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ab55d27d [Optimization](index) Optimization for no need to read raw 
data for index column that only in where clause (#16569)
f3ab55d27d is described below

commit f3ab55d27d3232daf0d50acd3f06ff13bbdfddd0
Author: YueW <[email protected]>
AuthorDate: Tue Feb 14 00:12:45 2023 +0800

    [Optimization](index) Optimization for no need to read raw data for index 
column that only in where clause (#16569)
---
 be/src/exec/olap_common.h                          |  10 ++
 be/src/olap/column_predicate.h                     |   1 +
 be/src/olap/iterators.h                            |   1 +
 be/src/olap/reader.cpp                             |  11 +-
 be/src/olap/reader.h                               |   3 +
 be/src/olap/rowset/beta_rowset_reader.cpp          |   1 +
 be/src/olap/rowset/rowset_reader_context.h         |   1 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 162 ++++++++++++++++++++-
 be/src/olap/rowset/segment_v2/segment_iterator.h   |  41 +++++-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |   9 ++
 be/src/vec/exec/scan/new_olap_scan_node.h          |   2 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   1 +
 be/src/vec/exec/scan/vscan_node.cpp                |  21 ++-
 be/src/vec/exec/scan/vscan_node.h                  |   1 +
 .../apache/doris/analysis/AggregateInfoBase.java   |   5 +
 .../java/org/apache/doris/analysis/SortInfo.java   |   8 +
 .../org/apache/doris/planner/OlapScanNode.java     |   9 ++
 .../org/apache/doris/planner/OriginalPlanner.java  |  71 +++++++++
 .../java/org/apache/doris/planner/PlanNode.java    |   4 +
 .../doris/planner/TableFunctionPlanTest.java       |   2 +-
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 .../test_add_drop_index_with_delete_data.groovy    |   2 +-
 23 files changed, 358 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index ca3710002d..19548b59f3 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -199,6 +199,7 @@ public:
             if (TYPE_MIN != _low_value || FILTER_LARGER_OR_EQUAL != _low_op) {
                 low.__set_column_name(_column_name);
                 low.__set_condition_op((_low_op == FILTER_LARGER_OR_EQUAL ? 
">=" : ">>"));
+                
low.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
                 low.condition_values.push_back(
                         cast_to_string<primitive_type, CppType>(_low_value, 
_scale));
             }
@@ -211,6 +212,7 @@ public:
             if (TYPE_MAX != _high_value || FILTER_LESS_OR_EQUAL != _high_op) {
                 high.__set_column_name(_column_name);
                 high.__set_condition_op((_high_op == FILTER_LESS_OR_EQUAL ? 
"<=" : "<<"));
+                
high.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
                 high.condition_values.push_back(
                         cast_to_string<primitive_type, CppType>(_high_value, 
_scale));
             }
@@ -237,6 +239,7 @@ public:
         TCondition condition;
         condition.__set_column_name(_column_name);
         condition.__set_condition_op(is_in ? "*=" : "!*=");
+        
condition.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
 
         for (const auto& value : _fixed_values) {
             condition.condition_values.push_back(
@@ -333,6 +336,12 @@ public:
         _contain_null = contain_null;
     }
 
+    void mark_runtime_filter_predicate(bool is_runtime_filter_predicate) {
+        _marked_runtime_filter_predicate = is_runtime_filter_predicate;
+    }
+
+    bool get_marked_by_runtime_filter() const { return 
_marked_runtime_filter_predicate; }
+
     int precision() const { return _precision; }
 
     int scale() const { return _scale; }
@@ -413,6 +422,7 @@ private:
 
     // range value except leaf node of and node in compound expr tree
     std::set<std::pair<SQLFilterOp, CppType>> _compound_values;
+    bool _marked_runtime_filter_predicate = false;
 };
 
 class OlapScanKeys {
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 2af921ddf4..c87adaafb8 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -34,6 +34,7 @@ class Schema;
 
 struct PredicateParams {
     std::string value;
+    bool marked_by_runtime_filter = false;
 };
 
 enum class PredicateType {
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 61ce931e52..3cc8da46ca 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -119,6 +119,7 @@ public:
     std::vector<uint32_t>* read_orderby_key_columns = nullptr;
     IOContext io_ctx;
     vectorized::VExpr* remaining_vconjunct_root = nullptr;
+    const std::set<int32_t>* output_columns = nullptr;
     // runtime state
     RuntimeState* runtime_state = nullptr;
     RowsetId rowset_id;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 0d116dd0ae..8d59085908 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -227,6 +227,7 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params,
     _reader_context.record_rowids = read_params.record_rowids;
     _reader_context.is_key_column_group = read_params.is_key_column_group;
     _reader_context.remaining_vconjunct_root = 
read_params.remaining_vconjunct_root;
+    _reader_context.output_columns = &read_params.output_columns;
 
     *valid_rs_readers = *rs_readers;
 
@@ -457,6 +458,7 @@ void TabletReader::_init_conditions_param(const 
ReaderParams& read_params) {
             // _gen_predicate_result_sign will build predicate result unique 
sign with condition value
             auto predicate_params = predicate->predicate_params();
             predicate_params->value = condition.condition_values[0];
+            predicate_params->marked_by_runtime_filter = 
condition.marked_by_runtime_filter;
             if (_tablet_schema->column_by_uid(condition_col_uid).aggregation() 
!=
                 FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
                 _value_col_predicates.push_back(predicate);
@@ -476,7 +478,13 @@ void TabletReader::_init_conditions_param(const 
ReaderParams& read_params) {
     }
 
     for (const auto& filter : read_params.in_filters) {
-        _col_predicates.emplace_back(_parse_to_predicate(filter));
+        ColumnPredicate* predicate = _parse_to_predicate(filter);
+        if (predicate != nullptr) {
+            // in_filters from runtime filter predicates which pushed down to 
data source.
+            auto predicate_params = predicate->predicate_params();
+            predicate_params->marked_by_runtime_filter = true;
+        }
+        _col_predicates.emplace_back(predicate);
     }
 
     // Function filter push down to storage engine
@@ -523,6 +531,7 @@ void 
TabletReader::_init_conditions_param_except_leafnode_of_andnode(
                 parse_to_predicate(_tablet_schema, tmp_cond, 
_predicate_mem_pool.get());
         if (predicate != nullptr) {
             auto predicate_params = predicate->predicate_params();
+            predicate_params->marked_by_runtime_filter = 
condition.marked_by_runtime_filter;
             predicate_params->value = condition.condition_values[0];
             _col_preds_except_leafnode_of_andnode.push_back(predicate);
         }
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 3abccc8044..4168341738 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -101,7 +101,10 @@ public:
         DeleteBitmap* delete_bitmap {nullptr};
 
         std::vector<RowsetReaderSharedPtr> rs_readers;
+        // return_columns is init from query schema
         std::vector<uint32_t> return_columns;
+        // output_columns only contain columns in OrderByExprs and outputExprs
+        std::set<int32_t> output_columns;
         RuntimeProfile* profile = nullptr;
         RuntimeState* runtime_state = nullptr;
 
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0f978e3c3a..2ba9e2fe85 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -163,6 +163,7 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     _read_options.read_orderby_key_columns = 
read_context->read_orderby_key_columns;
     _read_options.io_ctx.reader_type = read_context->reader_type;
     _read_options.runtime_state = read_context->runtime_state;
+    _read_options.output_columns = read_context->output_columns;
 
     // load segments
     // use cache is true when do vertica compaction
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index cd52884fc9..e342a5a824 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -77,6 +77,7 @@ struct RowsetReaderContext {
     bool record_rowids = false;
     bool is_vertical_compaction = false;
     bool is_key_column_group = false;
+    const std::set<int32_t>* output_columns = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 0f29a4e01a..39ada1e7d5 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -176,7 +176,14 @@ Status SegmentIterator::init(const StorageReadOptions& 
opts) {
     if (!opts.column_predicates_except_leafnode_of_andnode.empty()) {
         _col_preds_except_leafnode_of_andnode = 
opts.column_predicates_except_leafnode_of_andnode;
     }
+
+    if (opts.output_columns != nullptr) {
+        _output_columns = *(opts.output_columns);
+    }
+
     _remaining_vconjunct_root = opts.remaining_vconjunct_root;
+    _column_predicate_info.reset(new ColumnPredicateInfo());
+    _calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root);
 
     _column_predicate_info.reset(new ColumnPredicateInfo());
     if (_schema.rowid_col_idx() > 0) {
@@ -419,12 +426,19 @@ Status SegmentIterator::_apply_bitmap_index() {
     for (auto pred : _col_predicates) {
         int32_t unique_id = _schema.unique_id(pred->column_id());
         if (_bitmap_index_iterators.count(unique_id) < 1 ||
-            _bitmap_index_iterators[unique_id] == nullptr) {
+            _bitmap_index_iterators[unique_id] == nullptr || pred->type() == 
PredicateType::BF) {
             // no bitmap index for this column
             remaining_predicates.push_back(pred);
         } else {
             RETURN_IF_ERROR(pred->evaluate(_bitmap_index_iterators[unique_id], 
_segment->num_rows(),
                                            &_row_bitmap));
+
+            auto column_name = _schema.column(pred->column_id())->name();
+            if (_check_column_pred_all_push_down(column_name) &&
+                !pred->predicate_params()->marked_by_runtime_filter) {
+                _need_read_data_indices[unique_id] = false;
+            }
+
             if (_row_bitmap.isEmpty()) {
                 break; // all rows have been pruned, no need to process 
further predicates
             }
@@ -518,7 +532,8 @@ Status SegmentIterator::_execute_compound_fn(const 
std::string& function_name) {
 
 bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
     for (auto pred : _col_preds_except_leafnode_of_andnode) {
-        if (!_check_apply_by_bitmap_index(pred) && 
!_check_apply_by_inverted_index(pred)) {
+        if (_not_apply_index_pred.count(pred->column_id()) ||
+            (!_check_apply_by_bitmap_index(pred) && 
!_check_apply_by_inverted_index(pred))) {
             return false;
         }
     }
@@ -597,6 +612,7 @@ Status 
SegmentIterator::_apply_index_except_leafnode_of_andnode() {
                  pred->type() != PredicateType::MATCH) ||
                 res.code() == ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT) {
                 // downgrade without index query
+                _not_apply_index_pred.insert(pred->column_id());
                 continue;
             }
             LOG(WARNING) << "failed to evaluate index"
@@ -610,6 +626,16 @@ Status 
SegmentIterator::_apply_index_except_leafnode_of_andnode() {
                 std::make_pair(pred_result_sign, std::make_pair(true, 
bitmap)));
     }
 
+    for (auto pred : _col_preds_except_leafnode_of_andnode) {
+        auto column_name = _schema.column(pred->column_id())->name();
+        if (_remaining_vconjunct_root != nullptr &&
+            _check_column_pred_all_push_down(column_name, true) &&
+            !pred->predicate_params()->marked_by_runtime_filter) {
+            int32_t unique_id = _schema.unique_id(pred->column_id());
+            _need_read_data_indices[unique_id] = false;
+        }
+    }
+
     return Status::OK();
 }
 
@@ -645,6 +671,11 @@ bool 
SegmentIterator::_is_handle_predicate_by_fulltext(int32_t unique_id) {
     std::all_of(predicate_set.begin(), predicate_set.end(), \
                 [](const ColumnPredicate* p) { return 
PredicateTypeTraits::is_range(p->type()); })
 
+#define all_predicates_are_marked_by_runtime_filter(predicate_set)             
               \
+    std::all_of(predicate_set.begin(), predicate_set.end(), [](const 
ColumnPredicate* p) {    \
+        return 
const_cast<ColumnPredicate*>(p)->predicate_params()->marked_by_runtime_filter; \
+    })
+
 Status SegmentIterator::_apply_inverted_index_on_column_predicate(
         ColumnPredicate* pred, std::vector<ColumnPredicate*>& 
remaining_predicates,
         bool* continue_apply) {
@@ -679,6 +710,12 @@ Status 
SegmentIterator::_apply_inverted_index_on_column_predicate(
             return res;
         }
 
+        auto column_name = _schema.column(pred->column_id())->name();
+        if (_check_column_pred_all_push_down(column_name) &&
+            !pred->predicate_params()->marked_by_runtime_filter) {
+            _need_read_data_indices[unique_id] = false;
+        }
+
         auto pred_type = pred->type();
         if (pred_type == PredicateType::MATCH) {
             std::string pred_result_sign = _gen_predicate_result_sign(pred);
@@ -721,6 +758,10 @@ Status 
SegmentIterator::_apply_inverted_index_on_block_column_predicate(
                                   &output_result);
 
         if (res.ok()) {
+            if (_check_column_pred_all_push_down(column_name) &&
+                !all_predicates_are_marked_by_runtime_filter(predicate_set)) {
+                _need_read_data_indices[unique_id] = false;
+            }
             no_need_to_pass_column_predicate_set.insert(predicate_set.begin(), 
predicate_set.end());
             _row_bitmap &= output_result;
             if (_row_bitmap.isEmpty()) {
@@ -742,6 +783,17 @@ Status 
SegmentIterator::_apply_inverted_index_on_block_column_predicate(
     return Status::OK();
 }
 
+bool SegmentIterator::_need_read_data(ColumnId cid) {
+    int32_t unique_id = _schema.unique_id(cid);
+    if (_need_read_data_indices.count(unique_id) > 0 && 
!_need_read_data_indices[unique_id] &&
+        _output_columns.count(unique_id) < 1) {
+        VLOG_DEBUG << "SegmentIterator no need read data for column: "
+                   << _opts.tablet_schema->column_by_uid(unique_id).name();
+        return false;
+    }
+    return true;
+}
+
 Status SegmentIterator::_apply_inverted_index() {
     SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
     size_t input_rows = _row_bitmap.cardinality();
@@ -993,6 +1045,9 @@ Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
 
 Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, 
rowid_t pos) {
     for (auto cid : column_ids) {
+        if (!_need_read_data(cid)) {
+            continue;
+        }
         
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->seek_to_ordinal(pos));
     }
     return Status::OK();
@@ -1201,11 +1256,33 @@ void SegmentIterator::_vec_init_char_column_id() {
     }
 }
 
+bool SegmentIterator::_prune_column(ColumnId cid, 
vectorized::MutableColumnPtr& column,
+                                    bool fill_defaults, size_t 
num_of_defaults) {
+    if (_need_read_data(cid)) {
+        return false;
+    }
+    if (!fill_defaults) {
+        return true;
+    }
+    if (column->is_nullable()) {
+        auto nullable_col_ptr = 
reinterpret_cast<vectorized::ColumnNullable*>(column.get());
+        
nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
+        
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
+    } else {
+        // assert(column->is_const());
+        column->insert_many_defaults(num_of_defaults);
+    }
+    return true;
+}
+
 Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
                                       vectorized::MutableColumns& 
column_block, size_t nrows) {
     for (auto cid : column_ids) {
         auto& column = column_block[cid];
         size_t rows_read = nrows;
+        if (_prune_column(cid, column, true, rows_read)) {
+            continue;
+        }
         
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->next_batch(&rows_read,
 column));
         DCHECK_EQ(nrows, rows_read);
     }
@@ -1370,7 +1447,8 @@ uint16_t 
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
 
 Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& 
read_column_ids,
                                                 std::vector<rowid_t>& 
rowid_vector,
-                                                uint16_t* sel_rowid_idx, 
size_t select_size) {
+                                                uint16_t* sel_rowid_idx, 
size_t select_size,
+                                                vectorized::MutableColumns* 
mutable_columns) {
     SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
     std::vector<rowid_t> rowids(select_size);
     for (size_t i = 0; i < select_size; ++i) {
@@ -1378,6 +1456,9 @@ Status 
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
     }
 
     for (auto cid : read_column_ids) {
+        if (_prune_column(cid, (*mutable_columns)[cid], true, select_size)) {
+            continue;
+        }
         
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->read_by_rowids(
                 rowids.data(), select_size, _current_return_columns[cid]));
     }
@@ -1493,7 +1574,8 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
 
         // step3: read non_predicate column
         RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, 
_block_rowids,
-                                                sel_rowid_idx, selected_size));
+                                                sel_rowid_idx, selected_size,
+                                                &_current_return_columns));
 
         // step4: output columns
         // 4.1 output non-predicate column
@@ -1642,5 +1724,77 @@ Status 
SegmentIterator::current_block_row_locations(std::vector<RowLocation>* bl
     return Status::OK();
 }
 
+/**
+ * solution 1: where cluase included nodes are all `and` leaf nodes,
+ * predicate pushed down and remove from vconjunct.
+ *  for example: where A = 1 and B = 'test' and B like '%he%';
+ *      column A : `A = 1` pushed down, this column's predicates all pushed 
down,
+ *                  call _check_column_pred_all_push_down will return true.
+ *      column B : `B = 'test'` pushed down, but `B like '%he%'` remain in 
vconjunct,
+ *                  call _check_column_pred_all_push_down will return false.
+ *
+ * solution 2: where cluase included nodes are compound or other complex 
conditions,
+ * predicate pushed down but still remain in vconjunct.
+ *  for exmple: where (A = 1 and B = 'test') or B = 'hi' or (C like '%ye%' and 
C > 'aa');
+ *      column A : `A = 1` pushed down, check it applyed by index,
+ *                  call _check_column_pred_all_push_down will return true.
+ *      column B : `B = 'test'`, `B = 'hi'` all pushed down, check them all 
applyed by index,
+ *                  call _check_column_pred_all_push_down will return true.
+ *      column C : `C like '%ye%'` not pushed down, `C > 'aa'` pushed down, 
only `C > 'aa'` applyed by index,
+ *                  call _check_column_pred_all_push_down will return false.
+*/
+bool SegmentIterator::_check_column_pred_all_push_down(const std::string& 
column_name,
+                                                       bool in_compound) {
+    if (_remaining_vconjunct_root == nullptr) {
+        return true;
+    }
+
+    if (in_compound) {
+        auto preds_in_remaining_vconjuct = 
_column_pred_in_remaining_vconjunct[column_name];
+        for (auto pred_info : preds_in_remaining_vconjuct) {
+            auto column_sign = _gen_predicate_result_sign(&pred_info);
+            if (_rowid_result_for_index.count(column_sign) < 1) {
+                return false;
+            }
+        }
+    } else {
+        if (_column_pred_in_remaining_vconjunct[column_name].size() != 0) {
+            return false;
+        }
+    }
+    return true;
+}
+
+void SegmentIterator::_calculate_pred_in_remaining_vconjunct_root(const 
vectorized::VExpr* expr) {
+    if (expr == nullptr) {
+        return;
+    }
+
+    auto children = expr->children();
+    for (int i = 0; i < children.size(); ++i) {
+        _calculate_pred_in_remaining_vconjunct_root(children[i]);
+    }
+
+    auto node_type = expr->node_type();
+    if (node_type == TExprNodeType::SLOT_REF) {
+        _column_predicate_info->column_name = expr->expr_name();
+    } else if (_is_literal_node(node_type)) {
+        auto v_literal_expr = static_cast<const 
doris::vectorized::VLiteral*>(expr);
+        _column_predicate_info->query_value = v_literal_expr->value();
+    } else {
+        if (node_type == TExprNodeType::MATCH_PRED) {
+            _column_predicate_info->query_op = "match";
+        } else if (node_type != TExprNodeType::COMPOUND_PRED) {
+            _column_predicate_info->query_op = expr->fn().name.function_name;
+        }
+
+        if (!_column_predicate_info->is_empty()) {
+            
_column_pred_in_remaining_vconjunct[_column_predicate_info->column_name].push_back(
+                    *_column_predicate_info);
+            _column_predicate_info.reset(new ColumnPredicateInfo());
+        }
+    }
+}
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index f8ec46333b..b976fc53c7 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -51,6 +51,32 @@ class ColumnIterator;
 
 struct ColumnPredicateInfo {
     ColumnPredicateInfo() = default;
+
+    std::string debug_string() const {
+        std::stringstream ss;
+        ss << "column_name=" << column_name << ", query_op=" << query_op
+           << ", query_value=" << query_value;
+        return ss.str();
+    }
+
+    bool is_empty() const { return column_name.empty() && query_value.empty() 
&& query_op.empty(); }
+
+    bool is_equal(const ColumnPredicateInfo& column_pred_info) const {
+        if (column_pred_info.column_name != column_name) {
+            return false;
+        }
+
+        if (column_pred_info.query_value != query_value) {
+            return false;
+        }
+
+        if (column_pred_info.query_op != query_op) {
+            return false;
+        }
+
+        return true;
+    }
+
     std::string column_name;
     std::string query_value;
     std::string query_op;
@@ -167,7 +193,7 @@ private:
     void _output_non_pred_columns(vectorized::Block* block);
     Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
                                    std::vector<rowid_t>& rowid_vector, 
uint16_t* sel_rowid_idx,
-                                   size_t select_size);
+                                   size_t select_size, 
vectorized::MutableColumns* mutable_columns);
 
     template <class Container>
     Status _output_column_by_sel_idx(vectorized::Block* block, const 
Container& column_ids,
@@ -203,6 +229,14 @@ private:
     void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t 
select_size,
                                      vectorized::Block* block);
 
+    bool _need_read_data(ColumnId cid);
+    bool _prune_column(ColumnId cid, vectorized::MutableColumnPtr& column, 
bool fill_defaults,
+                       size_t num_of_defaults);
+
+    // return true means one column's predicates all pushed down
+    bool _check_column_pred_all_push_down(const std::string& column_name, bool 
in_compound = false);
+    void _calculate_pred_in_remaining_vconjunct_root(const vectorized::VExpr* 
expr);
+
 private:
     // todo(wb) remove this method after RowCursor is removed
     void _convert_rowcursor_to_short_key(const RowCursor& key, size_t 
num_keys) {
@@ -297,6 +331,7 @@ private:
     std::vector<ColumnId>
             _short_cir_pred_column_ids; // keep columnId of columns for short 
circuit predicate evaluation
     std::vector<bool> _is_pred_column; // columns hold by segmentIter
+    std::map<uint32_t, bool> _need_read_data_indices;
     vectorized::MutableColumns _current_return_columns;
     std::vector<ColumnPredicate*> _pre_eval_block_predicate;
     std::vector<ColumnPredicate*> _short_cir_eval_predicate;
@@ -320,8 +355,12 @@ private:
     doris::vectorized::VExpr* _remaining_vconjunct_root;
     std::vector<roaring::Roaring> 
_pred_except_leafnode_of_andnode_evaluate_result;
     std::unique_ptr<ColumnPredicateInfo> _column_predicate_info;
+    std::unordered_map<std::string, std::vector<ColumnPredicateInfo>>
+            _column_pred_in_remaining_vconjunct;
+    std::set<ColumnId> _not_apply_index_pred;
 
     std::shared_ptr<ColumnPredicate> _runtime_predicate {nullptr};
+    std::set<int32_t> _output_columns;
 
     // row schema of the key to seek
     // only used in `_get_row_ranges_by_keys`
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 0285189ef1..2978d806db 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -372,6 +372,15 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
                                           
(*_vconjunct_ctx_ptr)->root()->debug_string());
     }
 
+    if (!_olap_scan_node.output_column_unique_ids.empty()) {
+        for (auto uid : _olap_scan_node.output_column_unique_ids) {
+            if (uid < 0) {
+                continue;
+            }
+            _maybe_read_column_ids.emplace(uid);
+        }
+    }
+
     // ranges constructed from scan keys
     std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
     RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index c7a346b6db..b179f31060 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -68,6 +68,8 @@ private:
     // _compound_filters store conditions in the one compound relationship in 
conjunct expr tree except leaf node of `and` node,
     // such as: "(a or b) and (c or d)", conditions for a,b,c,d will be stored
     std::vector<TCondition> _compound_filters;
+    // If column id in this set, indicate that we need to read data after 
index filtering
+    std::set<int32_t> _maybe_read_column_ids;
 
 private:
     std::unique_ptr<RuntimeProfile> _segment_profile;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 8dc2e80e2f..a1d8ed0cea 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -215,6 +215,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
     _tablet_reader_params.version = Version(0, _version);
     _tablet_reader_params.remaining_vconjunct_root =
             (_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root();
+    _tablet_reader_params.output_columns = 
((NewOlapScanNode*)_parent)->_maybe_read_column_ids;
 
     // Condition
     for (auto& filter : filters) {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 53cf4930f5..f151a22014 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -511,6 +511,8 @@ Status VScanNode::_normalize_predicate(VExpr* 
conjunct_expr_root, VExpr** output
             auto impl = conjunct_expr_root->get_impl();
             // If impl is not null, which means this a conjuncts from runtime 
filter.
             VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : 
conjunct_expr_root;
+            bool is_runtimer_filter_predicate =
+                    _rf_vexpr_set.find(conjunct_expr_root) != 
_rf_vexpr_set.end();
             SlotDescriptor* slot = nullptr;
             ColumnValueRangeType* range = nullptr;
             PushDownType pdt = PushDownType::UNACCEPTABLE;
@@ -523,6 +525,10 @@ Status VScanNode::_normalize_predicate(VExpr* 
conjunct_expr_root, VExpr** output
                 _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, 
&slot, &range)) {
                 std::visit(
                         [&](auto& value_range) {
+                            Defer mark_runtime_filter_flag {[&]() {
+                                value_range.mark_runtime_filter_predicate(
+                                        is_runtimer_filter_predicate);
+                            }};
                             RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
                                     cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
                                     &pdt));
@@ -555,7 +561,8 @@ Status VScanNode::_normalize_predicate(VExpr* 
conjunct_expr_root, VExpr** output
             if (pdt == PushDownType::UNACCEPTABLE &&
                 TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) {
                 _normalize_compound_predicate(cur_expr, 
*(_vconjunct_ctx_ptr.get()), &pdt,
-                                              in_predicate_checker, 
eq_predicate_checker);
+                                              is_runtimer_filter_predicate, 
in_predicate_checker,
+                                              eq_predicate_checker);
                 *output_expr = conjunct_expr_root; // remaining in conjunct 
tree
                 return Status::OK();
             }
@@ -977,6 +984,7 @@ Status VScanNode::_normalize_noneq_binary_predicate(VExpr* 
expr, VExprContext* e
 
 Status VScanNode::_normalize_compound_predicate(
         vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt,
+        bool is_runtimer_filter_predicate,
         const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, 
VExpr**)>&
                 in_predicate_checker,
         const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, 
VExpr**)>&
@@ -997,6 +1005,10 @@ Status VScanNode::_normalize_compound_predicate(
                             *range_on_slot; // copy, in order not to affect 
the range in the _colname_to_value_range
                     std::visit(
                             [&](auto& value_range) {
+                                Defer mark_runtime_filter_flag {[&]() {
+                                    value_range.mark_runtime_filter_predicate(
+                                            is_runtimer_filter_predicate);
+                                }};
                                 
_normalize_binary_in_compound_predicate(child_expr, expr_ctx, slot,
                                                                         
value_range, pdt);
                             },
@@ -1015,6 +1027,10 @@ Status VScanNode::_normalize_compound_predicate(
                             *range_on_slot; // copy, in order not to affect 
the range in the _colname_to_value_range
                     std::visit(
                             [&](auto& value_range) {
+                                Defer mark_runtime_filter_flag {[&]() {
+                                    value_range.mark_runtime_filter_predicate(
+                                            is_runtimer_filter_predicate);
+                                }};
                                 
_normalize_match_in_compound_predicate(child_expr, expr_ctx, slot,
                                                                        
value_range, pdt);
                             },
@@ -1023,7 +1039,8 @@ Status VScanNode::_normalize_compound_predicate(
                     _compound_value_ranges.emplace_back(active_range);
                 }
             } else if (TExprNodeType::COMPOUND_PRED == 
child_expr->node_type()) {
-                _normalize_compound_predicate(child_expr, expr_ctx, pdt, 
in_predicate_checker,
+                _normalize_compound_predicate(child_expr, expr_ctx, pdt,
+                                              is_runtimer_filter_predicate, 
in_predicate_checker,
                                               eq_predicate_checker);
             }
         }
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index c8d7269087..476c7c37da 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -340,6 +340,7 @@ private:
 
     Status _normalize_compound_predicate(
             vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt,
+            bool is_runtimer_filter_predicate,
             const std::function<bool(const std::vector<VExpr*>&, const 
VSlotRef**, VExpr**)>&
                     in_predicate_checker,
             const std::function<bool(const std::vector<VExpr*>&, const 
VSlotRef**, VExpr**)>&
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
index 2501b308f2..431d5c9cba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java
@@ -154,6 +154,11 @@ public abstract class AggregateInfoBase {
             Expr expr = exprs.get(i);
             SlotDescriptor slotDesc = analyzer.addSlotDescriptor(result);
             slotDesc.initFromExpr(expr);
+            if (expr instanceof SlotRef) {
+                if (((SlotRef) expr).getColumn() != null) {
+                    slotDesc.setColumn(((SlotRef) expr).getColumn());
+                }
+            }
             // Not change the nullable of slot desc when is not grouping set id
             if (isGroupingSet && i < aggregateExprStartIndex - 1 && !(expr 
instanceof VirtualSlotRef)) {
                 slotDesc.setIsNullable(true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
index 00b871c3b4..0dc41c037d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
@@ -51,6 +51,7 @@ public class SortInfo {
     private static final float SORT_MATERIALIZATION_COST_THRESHOLD = 
Expr.FUNCTION_CALL_COST;
 
     private List<Expr> orderingExprs;
+    private List<Expr> origOrderingExprs;
     private final List<Boolean> isAscOrder;
     // True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
     private final List<Boolean> nullsFirstParams;
@@ -122,6 +123,10 @@ public class SortInfo {
         return orderingExprs;
     }
 
+    public List<Expr> getOrigOrderingExprs() {
+        return origOrderingExprs;
+    }
+
     public List<Boolean> getIsAscOrder() {
         return isAscOrder;
     }
@@ -261,6 +266,9 @@ public class SortInfo {
             }
         }
 
+        // backup before substitute orderingExprs
+        origOrderingExprs = orderingExprs;
+
         // The ordering exprs are evaluated against the sort tuple, so they 
must reflect the
         // materialization decision above.
         substituteOrderingExprs(substOrderBy, analyzer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0d1e3044d0..5c37623b2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -155,6 +155,7 @@ public class OlapScanNode extends ScanNode {
     private long totalBytes = 0;
 
     private SortInfo sortInfo = null;
+    private HashSet<Integer> outputColumnUniqueIds = new HashSet<>();
 
     // When scan match sort_info, we can push limit into OlapScanNode.
     // It's limit for scanner instead of scanNode so we add a new limit.
@@ -813,6 +814,10 @@ public class OlapScanNode extends ScanNode {
         LOG.debug("distribution prune cost: {} ms", 
(System.currentTimeMillis() - start));
     }
 
+    public void setOutputColumnUniqueIds(HashSet<Integer> 
outputColumnUniqueIds) {
+        this.outputColumnUniqueIds = outputColumnUniqueIds;
+    }
+
     /**
      * First, determine how many rows to sample from each partition according 
to the number of partitions.
      * Then determine the number of Tablets to be selected for each partition 
according to the average number
@@ -1149,6 +1154,10 @@ public class OlapScanNode extends ScanNode {
         if (pushDownAggNoGroupingOp != null) {
             msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
         }
+
+        if (outputColumnUniqueIds != null) {
+            msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
+        }
     }
 
     // export some tablets
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 00328d1ef9..f03c9fe1fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -253,6 +254,10 @@ public class OriginalPlanner extends Planner {
 
         pushDownResultFileSink(analyzer);
 
+        if (VectorizedUtil.isVectorized()) {
+            pushOutColumnUniqueIdsToOlapScan(rootFragment, analyzer);
+        }
+
         if (queryStmt instanceof SelectStmt) {
             SelectStmt selectStmt = (SelectStmt) queryStmt;
             if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != 
null) {
@@ -436,6 +441,72 @@ public class OriginalPlanner extends Planner {
         }
     }
 
+    /**
+     * outputColumnUniqueIds contain columns in OrderByExprs and outputExprs,
+     * push output column unique id set to olap scan.
+     *
+     * when query to storage layer, there are need read raw data
+     * for columns which in outputColumnUniqueIds
+     *
+     * for example:
+     * select A from tb where B = 1 and C > 'hello' order by B;
+     *
+     * column unique id for `A` and `B` will put into outputColumnUniqueIds.
+     *
+    */
+    private void pushOutColumnUniqueIdsToOlapScan(PlanFragment rootFragment, 
Analyzer analyzer) {
+        HashSet<Integer> outputColumnUniqueIds =  new HashSet<>();
+        ArrayList<Expr> outputExprs = rootFragment.getOutputExprs();
+        for (Expr expr : outputExprs) {
+            if (expr instanceof SlotRef) {
+                if (((SlotRef) expr).getColumn() != null) {
+                    outputColumnUniqueIds.add(((SlotRef) 
expr).getColumn().getUniqueId());
+                }
+            }
+        }
+
+        for (PlanFragment fragment : fragments) {
+            PlanNode node = fragment.getPlanRoot();
+            PlanNode parent = null;
+            while (node.getChildren().size() != 0) {
+                for (PlanNode childNode : node.getChildren()) {
+                    List<SlotId> outputSlotIds = childNode.getOutputSlotIds();
+                    if (outputSlotIds != null) {
+                        for (SlotId sid : outputSlotIds) {
+                            SlotDescriptor slotDesc = 
analyzer.getSlotDesc(sid);
+                            outputColumnUniqueIds.add(slotDesc.getUniqueId());
+                        }
+                    }
+                }
+                // OlapScanNode is the last node.
+                // So, just get the two node and check if they are SortNode 
and OlapScan.
+                parent = node;
+                node = node.getChildren().get(0);
+            }
+
+            if (parent instanceof SortNode) {
+                SortNode sortNode = (SortNode) parent;
+                List<Expr> orderingExprs = 
sortNode.getSortInfo().getOrigOrderingExprs();
+                if (orderingExprs != null) {
+                    for (Expr expr : orderingExprs) {
+                        if (expr instanceof SlotRef) {
+                            if (((SlotRef) expr).getColumn() != null) {
+                                outputColumnUniqueIds.add(((SlotRef) 
expr).getColumn().getUniqueId());
+                            }
+                        }
+                    }
+                }
+            }
+
+            if (!(node instanceof OlapScanNode)) {
+                continue;
+            }
+
+            OlapScanNode scanNode = (OlapScanNode) node;
+            scanNode.setOutputColumnUniqueIds(outputColumnUniqueIds);
+        }
+    }
+
     /**
      * optimize for topn query like: SELECT * FROM t1 WHERE a>100 ORDER BY b,c 
LIMIT 100
      * the pre-requirement is as follows:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 4c6cf88fad..86c0520b27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1130,4 +1130,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
     public List<Expr> getProjectList() {
         return projectList;
     }
+
+    public List<SlotId> getOutputSlotIds() {
+        return outputSlotIds;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
index a621994b34..244b7c9288 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
@@ -409,7 +409,7 @@ public class TableFunctionPlanTest {
                 "SlotDescriptor{id=1,col=k2,colUniqueId=1,type=VARCHAR(1)"
         ));
         Assert.assertTrue(formatString.contains(
-                "SlotDescriptor{id=2,col=null,colUniqueId=null,type=INT"
+                "SlotDescriptor{id=2,col=k1,colUniqueId=0,type=INT"
         ));
         Assert.assertTrue(formatString.contains(
                 "SlotDescriptor{id=3,col=null,colUniqueId=null,type=VARCHAR(*)"
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 59945cb932..52404f1615 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -545,6 +545,7 @@ struct TCondition {
     // In delete condition, the different column may have same column name, 
need
     // using unique id to distinguish them
     4:  optional i32 column_unique_id
+    5:  optional bool marked_by_runtime_filter = false
 }
 
 struct TExportStatusResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index e7853a1bc7..1eb2372977 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -575,6 +575,7 @@ struct TOlapScanNode {
   12: optional TPushAggOp push_down_agg_type_opt
   13: optional bool use_topn_opt
   14: optional list<Descriptors.TOlapTableIndex> indexes_desc
+  15: optional set<i32> output_column_unique_ids
 }
 
 struct TEqJoinCondition {
diff --git 
a/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
 
b/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
index ebf0a7f4b9..365811f3a6 100644
--- 
a/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
+++ 
b/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy
@@ -35,7 +35,7 @@ suite("test_add_drop_index_with_delete_data", 
"inverted_index"){
         assertTrue(useTime <= OpTimeout)
     }
 
-    def indexTbName1 = "test_add_drop_inverted_index2"
+    def indexTbName1 = "test_add_drop_inverted_index3"
 
     sql "DROP TABLE IF EXISTS ${indexTbName1}"
     // create 1 replica table


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to