This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch ckb2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c67aa4f3ca029d3597d610b5c68bda7cdca10d03
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Mar 5 16:34:33 2026 +0800

    adjust single streaming threshold
    
    update
    
    update
---
 .../distinct_streaming_aggregation_operator.cpp    | 128 ++++++++++++---------
 .../distinct_streaming_aggregation_operator.h      |   2 +
 .../operator/streaming_aggregation_operator.cpp    | 126 +++++++++++---------
 .../exec/operator/streaming_aggregation_operator.h |   1 +
 be/src/runtime/fragment_mgr.cpp                    |   5 +
 be/src/runtime/query_context.h                     |   8 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |   2 +
 .../org/apache/doris/qe/CoordinatorContext.java    |   5 +
 .../doris/qe/runtime/ThriftPlansBuilder.java       |   5 +
 gensrc/thrift/PaloInternalService.thrift           |   5 +
 10 files changed, 174 insertions(+), 113 deletions(-)

diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp 
b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
index 48a43ca4c39..d79388c2118 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry 
STREAMING_HT_MIN_REDUCTION[] = {
         {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
 };
 
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};
+
 static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
         sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
@@ -62,7 +73,8 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
           batch_size(state->batch_size()),
           _agg_data(std::make_unique<DistinctDataVariants>()),
           _child_block(Block::create_unique()),
-          _aggregated_block(Block::create_unique()) {}
+          _aggregated_block(Block::create_unique()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
 
 Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -98,62 +110,64 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
     }
 
     return std::visit(
-            Overload {
-                    [&](std::monostate& arg) -> bool {
-                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                        return false;
-                    },
-                    [&](auto& agg_method) -> bool {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        auto [ht_mem, ht_rows] =
-                                std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
-                        // Need some rows in tables to have valid statistics.
-                        if (ht_rows == 0) {
-                            return true;
-                        }
-
-                        // Find the appropriate reduction factor in our table 
for the current hash table sizes.
-                        int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-                            ++cache_level;
-                        }
-
-                        // Compare the number of rows in the hash table with 
the number of input rows that
-                        // were aggregated into it. Exclude passed through 
rows from this calculation since
-                        // they were not in hash tables.
-                        const int64_t input_rows = _input_num_rows;
-                        const int64_t aggregated_input_rows = input_rows - 
_num_rows_returned;
-                        // TODO chenhao
-                        //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
-                        double current_reduction = 
static_cast<double>(aggregated_input_rows) /
-                                                   
static_cast<double>(ht_rows);
-
-                        // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
-                        // inaccurate, which could lead to a divide by zero 
below.
-                        if (aggregated_input_rows <= 0) {
-                            return true;
-                        }
-
-                        // Extrapolate the current reduction factor (r) using 
the formula
-                        // R = 1 + (N / n) * (r - 1), where R is the reduction 
factor over the full input data
-                        // set, N is the number of input rows, excluding 
passed-through rows, and n is the
-                        // number of rows inserted or merged into the hash 
tables. This is a very rough
-                        // approximation but is good enough to be useful.
-                        // TODO: consider collecting more statistics to better 
estimate reduction.
-                        //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
-                        //      ? current_reduction
-                        //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
-                        //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
-                        //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
-                        //  return estimated_reduction > min_reduction;
-                        _should_expand_hash_table = current_reduction > 
min_reduction;
-                        return _should_expand_hash_table;
-                    }},
+            Overload {[&](std::monostate& arg) -> bool {
+                          throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                          return false;
+                      },
+                      [&](auto& agg_method) -> bool {
+                          auto& hash_tbl = *agg_method.hash_table;
+                          auto [ht_mem, ht_rows] =
+                                  std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
+
+                          // Need some rows in tables to have valid statistics.
+                          if (ht_rows == 0) {
+                              return true;
+                          }
+
+                          const auto* reduction = _is_single_backend
+                                                          ? 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                          : 
STREAMING_HT_MIN_REDUCTION;
+
+                          // Find the appropriate reduction factor in our 
table for the current hash table sizes.
+                          int cache_level = 0;
+                          while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
+                                 ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
+                              ++cache_level;
+                          }
+
+                          // Compare the number of rows in the hash table with 
the number of input rows that
+                          // were aggregated into it. Exclude passed through 
rows from this calculation since
+                          // they were not in hash tables.
+                          const int64_t input_rows = _input_num_rows;
+                          const int64_t aggregated_input_rows = input_rows - 
_num_rows_returned;
+                          // TODO chenhao
+                          //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
+                          double current_reduction = 
static_cast<double>(aggregated_input_rows) /
+                                                     
static_cast<double>(ht_rows);
+
+                          // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
+                          // inaccurate, which could lead to a divide by zero 
below.
+                          if (aggregated_input_rows <= 0) {
+                              return true;
+                          }
+
+                          // Extrapolate the current reduction factor (r) 
using the formula
+                          // R = 1 + (N / n) * (r - 1), where R is the 
reduction factor over the full input data
+                          // set, N is the number of input rows, excluding 
passed-through rows, and n is the
+                          // number of rows inserted or merged into the hash 
tables. This is a very rough
+                          // approximation but is good enough to be useful.
+                          // TODO: consider collecting more statistics to 
better estimate reduction.
+                          //  double estimated_reduction = 
aggregated_input_rows >= expected_input_rows
+                          //      ? current_reduction
+                          //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
+                          double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
+
+                          //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
+                          //    
COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
+                          //  return estimated_reduction > min_reduction;
+                          _should_expand_hash_table = current_reduction > 
min_reduction;
+                          return _should_expand_hash_table;
+                      }},
             _agg_data->method_variant);
 }
 
diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h 
b/be/src/exec/operator/distinct_streaming_aggregation_operator.h
index 392da56e89b..07e015af86e 100644
--- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h
+++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h
@@ -84,6 +84,8 @@ private:
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
     RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+
+    bool _is_single_backend = false;
 };
 
 class DistinctStreamingAggOperatorX final
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp 
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index 96d416aeb7e..4458415b1a5 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
 // of the machine that we're running on.
 static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
         // Expand up to L2 cache always.
-        {0, 0.0},
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
         // Expand into L3 cache if we look like we're getting some reduction.
         // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
         // Expand into main memory if we're getting a significant reduction.
         // The L3 cache is generally 16MB or more
-        {16 * 1024 * 1024, 2.0},
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
 };
 
 static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
@@ -81,6 +92,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : Base(state, parent),
           _agg_data(std::make_unique<AggregatedDataVariants>()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()),
           _child_block(Block::create_unique()),
           _pre_aggregated_block(Block::create_unique()) {}
 
@@ -220,62 +232,64 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
     }
 
     return std::visit(
-            Overload {
-                    [&](std::monostate& arg) -> bool {
-                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                        return false;
-                    },
-                    [&](auto& agg_method) -> bool {
-                        auto& hash_tbl = *agg_method.hash_table;
-                        auto [ht_mem, ht_rows] =
-                                std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
-
-                        // Need some rows in tables to have valid statistics.
-                        if (ht_rows == 0) {
-                            return true;
-                        }
+            Overload {[&](std::monostate& arg) -> bool {
+                          throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                          return false;
+                      },
+                      [&](auto& agg_method) -> bool {
+                          auto& hash_tbl = *agg_method.hash_table;
+                          auto [ht_mem, ht_rows] =
+                                  std::pair 
{hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
 
-                        // Find the appropriate reduction factor in our table 
for the current hash table sizes.
-                        int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
-                            ++cache_level;
-                        }
+                          // Need some rows in tables to have valid statistics.
+                          if (ht_rows == 0) {
+                              return true;
+                          }
 
-                        // Compare the number of rows in the hash table with 
the number of input rows that
-                        // were aggregated into it. Exclude passed through 
rows from this calculation since
-                        // they were not in hash tables.
-                        const int64_t input_rows = _input_num_rows;
-                        const int64_t aggregated_input_rows = input_rows - 
_cur_num_rows_returned;
-                        // TODO chenhao
-                        //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
-                        double current_reduction = 
static_cast<double>(aggregated_input_rows) /
-                                                   
static_cast<double>(ht_rows);
-
-                        // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
-                        // inaccurate, which could lead to a divide by zero 
below.
-                        if (aggregated_input_rows <= 0) {
-                            return true;
-                        }
+                          const auto* reduction = _is_single_backend
+                                                          ? 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                          : 
STREAMING_HT_MIN_REDUCTION;
 
-                        // Extrapolate the current reduction factor (r) using 
the formula
-                        // R = 1 + (N / n) * (r - 1), where R is the reduction 
factor over the full input data
-                        // set, N is the number of input rows, excluding 
passed-through rows, and n is the
-                        // number of rows inserted or merged into the hash 
tables. This is a very rough
-                        // approximation but is good enough to be useful.
-                        // TODO: consider collecting more statistics to better 
estimate reduction.
-                        //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
-                        //      ? current_reduction
-                        //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
-
-                        //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
-                        //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
-                        //  return estimated_reduction > min_reduction;
-                        _should_expand_hash_table = current_reduction > 
min_reduction;
-                        return _should_expand_hash_table;
-                    }},
+                          // Find the appropriate reduction factor in our 
table for the current hash table sizes.
+                          int cache_level = 0;
+                          while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
+                                 ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
+                              ++cache_level;
+                          }
+
+                          // Compare the number of rows in the hash table with 
the number of input rows that
+                          // were aggregated into it. Exclude passed through 
rows from this calculation since
+                          // they were not in hash tables.
+                          const int64_t input_rows = _input_num_rows;
+                          const int64_t aggregated_input_rows = input_rows - 
_cur_num_rows_returned;
+                          // TODO chenhao
+                          //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
+                          double current_reduction = 
static_cast<double>(aggregated_input_rows) /
+                                                     
static_cast<double>(ht_rows);
+
+                          // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
+                          // inaccurate, which could lead to a divide by zero 
below.
+                          if (aggregated_input_rows <= 0) {
+                              return true;
+                          }
+
+                          // Extrapolate the current reduction factor (r) 
using the formula
+                          // R = 1 + (N / n) * (r - 1), where R is the 
reduction factor over the full input data
+                          // set, N is the number of input rows, excluding 
passed-through rows, and n is the
+                          // number of rows inserted or merged into the hash 
tables. This is a very rough
+                          // approximation but is good enough to be useful.
+                          // TODO: consider collecting more statistics to 
better estimate reduction.
+                          //  double estimated_reduction = 
aggregated_input_rows >= expected_input_rows
+                          //      ? current_reduction
+                          //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
+                          double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
+
+                          //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
+                          //    
COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
+                          //  return estimated_reduction > min_reduction;
+                          _should_expand_hash_table = current_reduction > 
min_reduction;
+                          return _should_expand_hash_table;
+                      }},
             _agg_data->method_variant);
 }
 
diff --git a/be/src/exec/operator/streaming_aggregation_operator.h 
b/be/src/exec/operator/streaming_aggregation_operator.h
index f72d6dce24a..abc905cbfbd 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.h
+++ b/be/src/exec/operator/streaming_aggregation_operator.h
@@ -110,6 +110,7 @@ private:
     std::vector<uint8_t> cmp_res;
     std::vector<int> order_directions;
     std::vector<int> null_directions;
+    bool _is_single_backend = false;
 
     struct HeapLimitCursor {
         HeapLimitCursor(int row_id, MutableColumns& limit_columns,
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b8298174f3e..10c4a7a96b6 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -877,6 +877,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, 
query_ctx));
     SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
+    // Set single_backend_query before prepare() so that pipeline local states
+    // (e.g. StreamingAggLocalState) can read the correct value in their 
constructors.
+    query_ctx->set_single_backend_query(params.__isset.query_options &&
+                                        
params.query_options.__isset.single_backend_query &&
+                                        
params.query_options.single_backend_query);
     int64_t duration_ns = 0;
     std::shared_ptr<PipelineFragmentContext> context = 
std::make_shared<PipelineFragmentContext>(
             query_ctx->query_id(), params, query_ctx, _exec_env, cb,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 1326d0bdfbe..aa1746ed8b0 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -107,6 +107,12 @@ public:
         return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
     }
 
+    bool is_single_backend_query() const { return _is_single_backend_query; }
+
+    void set_single_backend_query(bool is_single_backend_query) {
+        _is_single_backend_query = is_single_backend_query;
+    }
+
     int64_t get_remaining_query_time_seconds() const {
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
@@ -381,6 +387,8 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    bool _is_single_backend_query = false;
+
     // file cache context holders
     std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> 
_query_context_holders;
     // instance id + node id -> cte scan
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d0f002a3f15..f0705f8e195 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -835,6 +835,7 @@ public class Coordinator implements CoordInterface {
             // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does not
             // send data until ExchangeNode is ready to receive.
             boolean twoPhaseExecution = fragments.size() > 1;
+            boolean isSingleBackend = addressToBackendID.size() == 1;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
@@ -871,6 +872,7 @@ public class Coordinator implements CoordInterface {
                     
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
                     
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
                     
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+                    
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
                     
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
 
                     
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), 
pipelineExecContext);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index f3c825cf9d8..f1a124f487c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
     public final Supplier<Set<TUniqueId>> instanceIds = 
Suppliers.memoize(this::getInstanceIds);
     public final Supplier<Map<TNetworkAddress, Long>> backends = 
Suppliers.memoize(this::getBackends);
     public final Supplier<Integer> scanRangeNum = 
Suppliers.memoize(this::getScanRangeNum);
+    public final Supplier<Boolean> isSingleBackendQuery = 
Suppliers.memoize(this::computeIsSingleBackendQuery);
     public final Supplier<TNetworkAddress> directConnectFrontendAddress
             = Suppliers.memoize(this::computeDirectConnectCoordinator);
 
@@ -447,6 +448,10 @@ public class CoordinatorContext {
         return scanRangeNum;
     }
 
+    private boolean computeIsSingleBackendQuery() {
+        return backends.get().size() == 1;
+    }
+
     private int computeScanRangeNumByScanRange(TScanRangeParams param) {
         int scanRangeNum = 0;
         TScanRange scanRange = param.getScanRange();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 6ffa8de61a4..0738876282f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -106,6 +106,11 @@ public class ThriftPlansBuilder {
         Set<Integer> fragmentToNotifyClose = 
setParamsForRecursiveCteNode(distributedPlans,
                 coordinatorContext.runtimeFilters);
 
+        // Determine whether this query is assigned to a single backend and 
propagate it to
+        // TQueryOptions so that BE can apply more appropriate optimization 
strategies (e.g.
+        // streaming aggregation hash table thresholds).
+        
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
         // we should set runtime predicate first, then we can use heap sort 
and to thrift
         setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..495c1477647 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,11 @@ struct TQueryOptions {
   // Use paimon-cpp to read Paimon splits on BE
   201: optional bool enable_paimon_cpp_reader = false;
 
+  // Whether all fragments of this query are assigned to a single backend.
+  // When true, the streaming aggregation operator can use more aggressive
+  // hash table expansion thresholds since all data is local.
+  202: optional bool single_backend_query = false;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to