This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpc_preview6 by this push:
     new 115e2306cf0 adjust conjunct order by execute cost
115e2306cf0 is described below

commit 115e2306cf0405b9f67e89fcd856416c98dd4240
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Mon Jan 19 18:31:45 2026 +0800

    adjust conjunct order by execute cost
    
    thrift: add enable_adjust_conjunct_order_by_cost to TQueryOptions
    
    fe: add session variable enable_adjust_conjunct_order_by_cost
    
    fe: forward enable_adjust_conjunct_order_by_cost via toThrift if supported
    
    update
    
    update
    
    format
    
    update
    
    Update be/src/vec/exprs/vexpr_context.cpp
    
    Co-authored-by: Copilot <[email protected]>
    
    Update be/src/vec/exprs/vcompound_pred.h
    
    Co-authored-by: Copilot <[email protected]>
    
    Update be/src/vec/exprs/vectorized_fn_call.cpp
    
    Co-authored-by: Copilot <[email protected]>
    
    update
    
    update
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp        |  3 +--
 be/src/pipeline/exec/operator.cpp                         | 15 +++++++++------
 be/src/pipeline/exec/scan_operator.cpp                    | 12 +++++++++---
 be/src/pipeline/exec/streaming_aggregation_operator.cpp   |  1 -
 be/src/pipeline/exec/streaming_aggregation_operator.h     |  1 -
 be/src/runtime/runtime_state.h                            |  5 +++++
 be/src/vec/exec/format/parquet/vparquet_group_reader.cpp  |  6 ++++++
 be/src/vec/exprs/vcolumn_ref.h                            |  2 ++
 be/src/vec/exprs/vcompound_pred.h                         |  8 ++++++++
 be/src/vec/exprs/vectorized_fn_call.cpp                   | 13 +++++++++++++
 be/src/vec/exprs/vectorized_fn_call.h                     |  1 +
 be/src/vec/exprs/vexpr.h                                  |  8 ++++++++
 be/src/vec/exprs/vexpr_context.cpp                        |  9 +++++++++
 be/src/vec/exprs/vexpr_context.h                          |  4 +++-
 be/src/vec/exprs/vliteral.h                               |  2 ++
 be/src/vec/exprs/vruntimefilter_wrapper.h                 |  2 ++
 be/src/vec/exprs/vslot_ref.h                              |  2 ++
 be/src/vec/functions/function.h                           |  4 ++++
 be/src/vec/functions/functions_comparison.h               |  2 ++
 .../main/java/org/apache/doris/qe/SessionVariable.java    |  4 ++++
 gensrc/thrift/PaloInternalService.thrift                  |  2 ++
 21 files changed, 92 insertions(+), 14 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ddb4bbfbe6f..9239e13f7d4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -734,8 +734,7 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, int dest_i
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
           _pool(pool),
           _limit(tnode.limit),
-          _have_conjuncts((tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) ||
-                          (tnode.__isset.conjuncts && 
!tnode.conjuncts.empty())),
+          _have_conjuncts(tnode.__isset.conjuncts && !tnode.conjuncts.empty()),
           _partition_exprs(
                   tnode.__isset.distribute_expr_lists &&
                                   (require_bucket_distribution ||
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index cf6327bcf0c..26b63443dd6 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -179,7 +179,7 @@ std::string OperatorXBase::debug_string(RuntimeState* 
state, int indentation_lev
     return 
state->get_local_state(operator_id())->debug_string(indentation_level);
 }
 
-Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
+Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) {
     std::string node_name = print_plan_node_type(tnode.node_type);
     _nereids_id = tnode.nereids_id;
     if (!tnode.intermediate_output_tuple_id_list.empty()) {
@@ -199,11 +199,9 @@ Status OperatorXBase::init(const TPlanNode& tnode, 
RuntimeState* /*state*/) {
     _op_name = substr + "_OPERATOR";
 
     if (tnode.__isset.vconjunct) {
-        vectorized::VExprContextSPtr context;
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, 
context));
-        _conjuncts.emplace_back(context);
+        return Status::InternalError("vconjunct is not supported yet");
     } else if (tnode.__isset.conjuncts) {
-        for (auto& conjunct : tnode.conjuncts) {
+        for (const auto& conjunct : tnode.conjuncts) {
             vectorized::VExprContextSPtr context;
             RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, 
context));
             _conjuncts.emplace_back(context);
@@ -211,7 +209,6 @@ Status OperatorXBase::init(const TPlanNode& tnode, 
RuntimeState* /*state*/) {
     }
 
     // create the projections expr
-
     if (tnode.__isset.projections) {
         DCHECK(tnode.__isset.output_tuple_id);
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, 
_projections));
@@ -232,6 +229,12 @@ Status OperatorXBase::prepare(RuntimeState* state) {
     for (auto& conjunct : _conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
     }
+    if (state->enable_adjust_conjunct_order_by_cost()) {
+        std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
+            return a->execute_cost() < b->execute_cost();
+        });
+    };
+
     for (int i = 0; i < _intermediate_projections.size(); i++) {
         
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
                                                    intermediate_row_desc(i)));
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 7b08da8c9c2..504c821ce36 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -77,8 +77,14 @@ Status 
ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
                                                               int& 
arrived_rf_num) {
     // Lock needed because _conjuncts can be accessed concurrently by multiple 
scanner threads
     std::unique_lock lock(_conjuncts_lock);
-    return _helper.try_append_late_arrival_runtime_filter(state, 
_parent->row_descriptor(),
-                                                          arrived_rf_num, 
_conjuncts);
+    RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, 
_parent->row_descriptor(),
+                                                                   
arrived_rf_num, _conjuncts));
+    if (state->enable_adjust_conjunct_order_by_cost()) {
+        std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
+            return a->execute_cost() < b->execute_cost();
+        });
+    };
+    return Status::OK();
 }
 
 Status ScanLocalStateBase::clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
scanner_conjuncts) {
@@ -312,7 +318,7 @@ Status 
ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
                 message += conjunct->root()->debug_string();
             }
         }
-        custom_profile()->add_info_string("RemainedDownPredicates", message);
+        custom_profile()->add_info_string("RemainedPredicates", message);
     }
 
     for (auto& it : _slot_id_to_value_range) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 5959ef2cf03..c8568930176 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -825,7 +825,6 @@ StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool* 
pool, int operator_id,
           _output_tuple_id(tnode.agg_node.output_tuple_id),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
-          _have_conjuncts(tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()),
           _agg_fn_output_row_descriptor(descs, tnode.row_tuples),
           _partition_exprs(
                   tnode.__isset.distribute_expr_lists &&
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index ce45fe97c22..26ce294f8a9 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -265,7 +265,6 @@ private:
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     bool _can_short_circuit = false;
     std::vector<size_t> _make_nullable_keys;
-    bool _have_conjuncts;
     RowDescriptor _agg_fn_output_row_descriptor;
 
     // For sort limit
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cb80c4f1ba9..86ccb003279 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -125,6 +125,11 @@ public:
                                                            : 
_query_options.mem_limit / 20;
     }
 
+    bool enable_adjust_conjunct_order_by_cost() const {
+        return _query_options.__isset.enable_adjust_conjunct_order_by_cost &&
+               _query_options.enable_adjust_conjunct_order_by_cost;
+    }
+
     int32_t max_column_reader_num() const {
         return _query_options.__isset.max_column_reader_num ? 
_query_options.max_column_reader_num
                                                             : 20000;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 7532a1259f7..88d083e10d2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -188,6 +188,12 @@ Status RowGroupReader::init(
                                  
_lazy_read_ctx.missing_columns_conjuncts.end());
         RETURN_IF_ERROR(_rewrite_dict_predicates());
     }
+    // _state is nullptr in some ut.
+    if (_state && _state->enable_adjust_conjunct_order_by_cost()) {
+        std::ranges::sort(_filter_conjuncts, [](const auto& a, const auto& b) {
+            return a->execute_cost() < b->execute_cost();
+        });
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h
index 83dba5e3da2..3f4412cf12c 100644
--- a/be/src/vec/exprs/vcolumn_ref.h
+++ b/be/src/vec/exprs/vcolumn_ref.h
@@ -89,6 +89,8 @@ public:
         return out.str();
     }
 
+    double execute_cost() const override { return 0.0; }
+
 private:
     int _column_id;
     std::atomic<int> _gap = 0;
diff --git a/be/src/vec/exprs/vcompound_pred.h 
b/be/src/vec/exprs/vcompound_pred.h
index c1ba88fa2af..2ba4216847b 100644
--- a/be/src/vec/exprs/vcompound_pred.h
+++ b/be/src/vec/exprs/vcompound_pred.h
@@ -347,6 +347,14 @@ public:
         return Status::OK();
     }
 
+    double execute_cost() const override {
+        double cost = 0.3;
+        for (const auto& child : _children) {
+            cost += child->execute_cost();
+        }
+        return cost;
+    }
+
 private:
     static inline constexpr uint8_t apply_and_null(UInt8 a, UInt8 l_null, 
UInt8 b, UInt8 r_null) {
         // (<> && false) is false, (true && NULL) is NULL
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp 
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 0d4bc252394..cf6dc8fc6db 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -26,6 +26,7 @@
 #include <ostream>
 
 #include "common/config.h"
+#include "common/exception.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "common/utils.h"
@@ -641,5 +642,17 @@ Status VectorizedFnCall::evaluate_ann_range_search(
     return Status::OK();
 }
 
+double VectorizedFnCall::execute_cost() const {
+    if (!_function) {
+        throw Exception(
+                Status::InternalError("Function is null in expression: {}", 
this->debug_string()));
+    }
+    double cost = _function->execute_cost();
+    for (const auto& child : _children) {
+        cost += child->execute_cost();
+    }
+    return cost;
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vectorized_fn_call.h 
b/be/src/vec/exprs/vectorized_fn_call.h
index 2963e35931f..8da62455365 100644
--- a/be/src/vec/exprs/vectorized_fn_call.h
+++ b/be/src/vec/exprs/vectorized_fn_call.h
@@ -65,6 +65,7 @@ public:
     const std::string& expr_name() const override;
     std::string function_name() const;
     std::string debug_string() const override;
+    double execute_cost() const override;
     bool is_blockable() const override {
         return _function->is_blockable() ||
                std::any_of(_children.begin(), _children.end(),
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 31812816e9b..efc39fffc9b 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -277,6 +277,14 @@ public:
         return expr;
     }
 
+    virtual double execute_cost() const {
+        double cost = 1.0;
+        for (const auto& child : _children) {
+            cost += child->execute_cost();
+        }
+        return cost;
+    }
+
     // If this expr is a RuntimeFilterWrapper, this method will return an 
underlying rf expression
     virtual VExprSPtr get_impl() const { return {}; }
 
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index de40e9dbe87..63e8deb7aee 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -473,5 +473,14 @@ uint64_t VExprContext::get_digest(uint64_t seed) const {
     return _root->get_digest(seed);
 }
 
+double VExprContext::execute_cost() const {
+    if (_root == nullptr) {
+        // When there is no expression root, treat the cost as a base value.
+        // This avoids null dereferences while keeping a deterministic cost.
+        return 0.0;
+    }
+    return _root->execute_cost();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index de3fe02612c..98a1491e5d6 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -201,7 +201,9 @@ public:
 
     [[nodiscard]] Status execute_const_expr(ColumnWithTypeAndName& result);
 
-    VExprSPtr root() const { return _root; }
+    double execute_cost() const;
+
+    VExprSPtr root() { return _root; }
     void set_root(const VExprSPtr& expr) { _root = expr; }
     void set_index_context(std::shared_ptr<IndexExecContext> index_context) {
         _index_context = std::move(index_context);
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 72f1d90bbbe..a77ed5dafd0 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -56,6 +56,8 @@ public:
     const std::string& expr_name() const override { return _expr_name; }
     std::string debug_string() const override;
 
+    double execute_cost() const override { return 0.0; }
+
     MOCK_FUNCTION std::string value(const DataTypeSerDe::FormatOptions& 
options) const;
 
     const ColumnPtr& get_column_ptr() const { return _column_ptr; }
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index f9b6203ed6c..d3c42df23f4 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -64,6 +64,8 @@ public:
     const VExprSPtrs& children() const override { return _impl->children(); }
     TExprNodeType::type node_type() const override { return 
_impl->node_type(); }
 
+    double execute_cost() const override { return _impl->execute_cost(); }
+
     Status execute_filter(VExprContext* context, const Block* block,
                           uint8_t* __restrict result_filter_data, size_t rows, 
bool accept_null,
                           bool* can_filter_all) const override;
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index ff7e3812d76..a5feb3c999a 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -72,6 +72,8 @@ public:
 
     uint64_t get_digest(uint64_t seed) const override;
 
+    double execute_cost() const override { return 0.0; }
+
 private:
     int _slot_id;
     int _column_id;
diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h
index ca872a147a7..e5a692e473d 100644
--- a/be/src/vec/functions/function.h
+++ b/be/src/vec/functions/function.h
@@ -170,6 +170,8 @@ public:
     virtual const DataTypes& get_argument_types() const = 0;
     virtual const DataTypePtr& get_return_type() const = 0;
 
+    virtual double execute_cost() const { return 1.0; }
+
     /// Do preparations and return executable.
     /// sample_block should contain data types of arguments and values of 
constants, if relevant.
     virtual PreparedFunctionPtr prepare(FunctionContext* context, const Block& 
sample_block,
@@ -451,6 +453,8 @@ public:
         return function;
     }
 
+    double execute_cost() const override { return function->execute_cost(); }
+
     Status open(FunctionContext* context, FunctionContext::FunctionStateScope 
scope) override {
         return function->open(context, scope);
     }
diff --git a/be/src/vec/functions/functions_comparison.h 
b/be/src/vec/functions/functions_comparison.h
index 8c0d56e74ea..fc831997681 100644
--- a/be/src/vec/functions/functions_comparison.h
+++ b/be/src/vec/functions/functions_comparison.h
@@ -272,6 +272,8 @@ public:
 
     FunctionComparison() = default;
 
+    double execute_cost() const override { return 0.5; }
+
 private:
     template <PrimitiveType PT>
     Status execute_num_type(Block& block, uint32_t result, const ColumnPtr& 
col_left_ptr,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9e3f0112508..4ef012cb20a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3164,6 +3164,9 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public boolean enablePhraseQuerySequentialOpt = true;
 
+    @VariableMgr.VarAttr(name = "enable_adjust_conjunct_order_by_cost", 
needForward = true)
+    public boolean enableAdjustConjunctOrderByCost = true;
+
     @VariableMgr.VarAttr(name = REQUIRE_SEQUENCE_IN_INSERT, needForward = 
true, description = {
             "该变量用于控制,使用了 sequence 列的 unique key 表,insert into 操作是否要求必须提供每一行的 
sequence 列的值",
             "This variable controls whether the INSERT INTO operation on 
unique key tables with a sequence"
@@ -5186,6 +5189,7 @@ public class SessionVariable implements Serializable, 
Writable {
         } else {
             
tResult.setFileCacheQueryLimitPercent(Config.file_cache_query_limit_max_percent);
         }
+        
tResult.setEnableAdjustConjunctOrderByCost(enableAdjustConjunctOrderByCost);
 
         // Set Iceberg write target file size
         
tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 43ccf013d9c..dbfc8f2de76 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -432,6 +432,8 @@ struct TQueryOptions {
 
   188: optional bool enable_broadcast_join_force_passthrough;
 
+  200: optional bool enable_adjust_conjunct_order_by_cost
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to