This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 840297f5a1e9ee1aa1b94abf3268dc4ea949a13c
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 13 21:27:52 2026 +0800

    [Chore](pick) pick changes from PR #61104 and PR #60941 (#61303)
    
    pick changes from PR #61104 and PR #60941
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  56 ++--
 .../pipeline/exec/aggregation_source_operator.cpp  |   7 +-
 .../distinct_streaming_aggregation_operator.cpp    |  44 +--
 .../exec/distinct_streaming_aggregation_operator.h |   3 +-
 be/src/pipeline/exec/streaming_agg_min_reduction.h |  76 +++++
 .../exec/streaming_aggregation_operator.cpp        |  64 +---
 .../pipeline/exec/streaming_aggregation_operator.h |   2 +
 be/src/runtime/fragment_mgr.cpp                    |   5 +
 be/src/runtime/query_context.h                     |   8 +
 be/src/vec/common/hash_table/hash_map_context.h    | 369 ++++++++++++++++++++-
 be/src/vec/common/hash_table/string_hash_table.h   |  22 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |   2 +
 .../org/apache/doris/qe/CoordinatorContext.java    |   5 +
 .../doris/qe/runtime/ThriftPlansBuilder.java       |   5 +
 gensrc/thrift/PaloInternalService.thrift           |   5 +
 15 files changed, 547 insertions(+), 126 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index f0f74bc2592..aca5c500d19 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -575,10 +575,9 @@ void 
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
                            };
 
                            SCOPED_TIMER(_hash_table_emplace_timer);
-                           for (size_t i = 0; i < num_rows; ++i) {
-                               places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                    
creator_for_null_key);
-                           }
+                           vectorized::lazy_emplace_batch(
+                                   agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                   [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                            COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                        }},
@@ -660,10 +659,10 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                             };
 
                             SCOPED_TIMER(_hash_table_emplace_timer);
-                            for (i = 0; i < num_rows; ++i) {
-                                places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                     
creator_for_null_key);
-                            }
+                            vectorized::lazy_emplace_batch(
+                                    agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                    [&](uint32_t row) { i = row; },
+                                    [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
                             COUNTER_UPDATE(_hash_table_input_counter, 
num_rows);
                             return true;
                         }
@@ -675,27 +674,26 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
 void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* 
places,
                                             vectorized::ColumnRawPtrs& 
key_columns,
                                             uint32_t num_rows) {
-    std::visit(vectorized::Overload {[&](std::monostate& arg) -> void {
-                                         throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
-                                                                "uninited hash 
table");
-                                     },
-                                     [&](auto& agg_method) -> void {
-                                         using HashMethodType = 
std::decay_t<decltype(agg_method)>;
-                                         using AggState = typename 
HashMethodType::State;
-                                         AggState state(key_columns);
-                                         
agg_method.init_serialized_keys(key_columns, num_rows);
-
-                                         /// For all rows.
-                                         for (size_t i = 0; i < num_rows; ++i) 
{
-                                             auto find_result = 
agg_method.find(state, i);
-
-                                             if (find_result.is_found()) {
-                                                 places[i] = 
find_result.get_mapped();
-                                             } else {
-                                                 places[i] = nullptr;
-                                             }
-                                         }
-                                     }},
+    std::visit(vectorized::Overload {
+                       [&](std::monostate& arg) -> void {
+                           throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                       },
+                       [&](auto& agg_method) -> void {
+                           using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                           using AggState = typename HashMethodType::State;
+                           AggState state(key_columns);
+                           agg_method.init_serialized_keys(key_columns, 
num_rows);
+
+                           /// For all rows.
+                           vectorized::find_batch(agg_method, state, num_rows,
+                                                  [&](uint32_t row, auto& 
find_result) {
+                                                      if 
(find_result.is_found()) {
+                                                          places[row] = 
find_result.get_mapped();
+                                                      } else {
+                                                          places[row] = 
nullptr;
+                                                      }
+                                                  });
+                       }},
                _agg_data->method_variant);
 }
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index b8ecd137016..a8062b3c2f5 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -590,10 +590,9 @@ void 
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
                         };
 
                         SCOPED_TIMER(_hash_table_emplace_timer);
-                        for (size_t i = 0; i < num_rows; ++i) {
-                            places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                 
creator_for_null_key);
-                        }
+                        vectorized::lazy_emplace_batch(
+                                agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                [&](uint32_t row, auto& mapped) { places[row] 
= mapped; });
 
                         COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                         COUNTER_SET(_hash_table_memory_usage,
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 972fc9ba923..3fed13adfbc 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -23,6 +23,7 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "pipeline/exec/streaming_agg_min_reduction.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 
 namespace doris {
@@ -32,29 +33,6 @@ class RuntimeState;
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
-struct StreamingHtMinReductionEntry {
-    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
-    // bytes is greater than this threshold.
-    int min_ht_mem;
-    // The minimum reduction factor to expand the hash tables.
-    double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the 
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
-        // Expand up to L2 cache always.
-        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
-        // Expand into L3 cache if we look like we're getting some reduction.
-        // At present, The L2 cache is generally 1024k or more
-        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
-        // Expand into main memory if we're getting a significant reduction.
-        // The L3 cache is generally 16MB or more
-        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
-        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
                                                                OperatorXBase* 
parent)
@@ -62,7 +40,8 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
           batch_size(state->batch_size()),
           _agg_data(std::make_unique<DistinctDataVariants>()),
           _child_block(vectorized::Block::create_unique()),
-          _aggregated_block(vectorized::Block::create_unique()) {}
+          _aggregated_block(vectorized::Block::create_unique()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
 
 Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -113,10 +92,14 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                             return true;
                         }
 
+                        const auto* reduction =
+                                _is_single_backend ? 
doris::SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                   : 
doris::STREAMING_HT_MIN_REDUCTION;
+
                         // Find the appropriate reduction factor in our table 
for the current hash table sizes.
                         int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+                        while (cache_level + 1 < 
doris::STREAMING_HT_MIN_REDUCTION_SIZE &&
+                               ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
                             ++cache_level;
                         }
 
@@ -145,8 +128,7 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                         //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
                         //      ? current_reduction
                         //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+                        double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
 
                         //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
                         //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
@@ -310,9 +292,9 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
                         auto creator_for_null_key = [&]() { 
distinct_row.push_back(row); };
 
                         SCOPED_TIMER(_hash_table_emplace_timer);
-                        for (; row < num_rows; ++row) {
-                            agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
-                        }
+                        vectorized::lazy_emplace_batch_void(agg_method, state, 
num_rows, creator,
+                                                            
creator_for_null_key,
+                                                            [&](uint32_t r) { 
row = r; });
 
                         COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                     }},
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d4f7a08136b..8b7898857d6 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -87,6 +87,7 @@ private:
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
     RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+    bool _is_single_backend = false;
 };
 
 class DistinctStreamingAggOperatorX final
@@ -157,4 +158,4 @@ private:
 
 } // namespace pipeline
 } // namespace doris
-#include "common/compile_check_end.h"
\ No newline at end of file
+#include "common/compile_check_end.h"
diff --git a/be/src/pipeline/exec/streaming_agg_min_reduction.h 
b/be/src/pipeline/exec/streaming_agg_min_reduction.h
new file mode 100644
index 00000000000..df2acc66ec8
--- /dev/null
+++ b/be/src/pipeline/exec/streaming_agg_min_reduction.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace doris {
+
+/// The minimum reduction factor (input rows divided by output rows) to grow 
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently 
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the 
bucket
+/// arrays will fit in  a cache level. Intuitively, we don't want the working 
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the 
input
+/// enough to outweigh the increased memory latency we'll incur for each hash 
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of 
the
+/// final reduction. It may be biased either way depending on the ordering of 
the
+/// input. If the input order is random, we will underestimate the final 
reduction
+/// factor because the probability of a row having the same key as a previous 
row
+/// increases as more input is processed.  If the input order is correlated 
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we 
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final 
reduction
+/// using the planner's estimated input cardinality and the assumption that 
input
+/// is in a random order. This means that we assume that the reduction factor 
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
+    // bytes is greater than this threshold.
+    int min_ht_mem;
+    // The minimum reduction factor to expand the hash tables.
+    double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the 
cache size
+// of the machine that we're running on.
+static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 256k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 256k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 4.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 5.0},
+};
+
+static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
+        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+} // namespace doris
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 6c0c412f819..9ff1461d0d6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -25,6 +25,7 @@
 #include "common/cast_set.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/streaming_agg_min_reduction.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vslot_ref.h"
@@ -35,54 +36,13 @@ class RuntimeState;
 } // namespace doris
 
 namespace doris::pipeline {
-/// The minimum reduction factor (input rows divided by output rows) to grow 
hash tables
-/// in a streaming preaggregation, given that the hash tables are currently 
the given
-/// size or above. The sizes roughly correspond to hash table sizes where the 
bucket
-/// arrays will fit in  a cache level. Intuitively, we don't want the working 
set of the
-/// aggregation to expand to the next level of cache unless we're reducing the 
input
-/// enough to outweigh the increased memory latency we'll incur for each hash 
table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of 
the
-/// final reduction. It may be biased either way depending on the ordering of 
the
-/// input. If the input order is random, we will underestimate the final 
reduction
-/// factor because the probability of a row having the same key as a previous 
row
-/// increases as more input is processed.  If the input order is correlated 
with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we 
underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final 
reduction
-/// using the planner's estimated input cardinality and the assumption that 
input
-/// is in a random order. This means that we assume that the reduction factor 
will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
-    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
-    // bytes is greater than this threshold.
-    int min_ht_mem;
-    // The minimum reduction factor to expand the hash tables.
-    double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the 
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
-        // Expand up to L2 cache always.
-        {0, 0.0},
-        // Expand into L3 cache if we look like we're getting some reduction.
-        // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
-        // Expand into main memory if we're getting a significant reduction.
-        // The L3 cache is generally 16MB or more
-        {16 * 1024 * 1024, 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
-        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : Base(state, parent),
           _agg_data(std::make_unique<AggregatedDataVariants>()),
           _child_block(vectorized::Block::create_unique()),
-          _pre_aggregated_block(vectorized::Block::create_unique()) {}
+          _pre_aggregated_block(vectorized::Block::create_unique()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
 
 Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) 
{
     RETURN_IF_ERROR(Base::init(state, info));
@@ -236,10 +196,14 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
                             return true;
                         }
 
+                        const auto* reduction =
+                                _is_single_backend ? 
doris::SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                   : 
doris::STREAMING_HT_MIN_REDUCTION;
+
                         // Find the appropriate reduction factor in our table 
for the current hash table sizes.
                         int cache_level = 0;
-                        while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+                        while (cache_level + 1 < 
doris::STREAMING_HT_MIN_REDUCTION_SIZE &&
+                               ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
                             ++cache_level;
                         }
 
@@ -268,8 +232,7 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
                         //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
                         //      ? current_reduction
                         //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+                        double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
 
                         //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
                         //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
@@ -806,10 +769,9 @@ void 
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP
                            };
 
                            SCOPED_TIMER(_hash_table_emplace_timer);
-                           for (size_t i = 0; i < num_rows; ++i) {
-                               places[i] = *agg_method.lazy_emplace(state, i, 
creator,
-                                                                    
creator_for_null_key);
-                           }
+                           vectorized::lazy_emplace_batch(
+                                   agg_method, state, num_rows, creator, 
creator_for_null_key,
+                                   [&](uint32_t row, auto& mapped) { 
places[row] = mapped; });
 
                            COUNTER_UPDATE(_hash_table_input_counter, num_rows);
                        }},
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index a9e8cc54ba8..3632bea9430 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -199,6 +199,8 @@ private:
                                       }},
                 _agg_data->method_variant);
     }
+
+    bool _is_single_backend = false;
 };
 
 class StreamingAggOperatorX MOCK_REMOVE(final) : public 
StatefulOperatorX<StreamingAggLocalState> {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6bf60dbdfc5..6cd0ee1c7c1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -865,6 +865,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, 
query_ctx));
     SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
+    // Set single_backend_query before prepare() so that pipeline local states
+    // (e.g. StreamingAggLocalState) can read the correct value in their 
constructors.
+    query_ctx->set_single_backend_query(params.__isset.query_options &&
+                                        
params.query_options.__isset.single_backend_query &&
+                                        
params.query_options.single_backend_query);
     int64_t duration_ns = 0;
     std::shared_ptr<pipeline::PipelineFragmentContext> context =
             std::make_shared<pipeline::PipelineFragmentContext>(
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 6ec5664f8c0..7e086635c9d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -107,6 +107,12 @@ public:
         return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
     }
 
+    bool is_single_backend_query() const { return _is_single_backend_query; }
+
+    void set_single_backend_query(bool is_single_backend_query) {
+        _is_single_backend_query = is_single_backend_query;
+    }
+
     int64_t get_remaining_query_time_seconds() const {
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
@@ -374,6 +380,8 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    bool _is_single_backend_query = false;
+
     // file cache context holders
     std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> 
_query_context_holders;
 
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index e86381ba3b8..1854209b282 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -112,7 +112,7 @@ struct MethodBaseInner {
     }
 
     template <bool read>
-    ALWAYS_INLINE void prefetch(size_t i) {
+    void prefetch(size_t i) {
         if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
             hash_table->template prefetch<read>(keys[i + 
HASH_MAP_PREFETCH_DIST],
                                                 hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
@@ -120,19 +120,14 @@ struct MethodBaseInner {
     }
 
     template <typename State>
-    ALWAYS_INLINE auto find(State& state, size_t i) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<true>(i);
-        }
+    auto find(State& state, size_t i) {
+        prefetch<true>(i);
         return state.find_key_with_hash(*hash_table, i, keys[i], 
hash_values[i]);
     }
 
     template <typename State, typename F, typename FF>
-    ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator,
-                                    FF&& creator_for_null_key) {
-        if constexpr (!is_string_hash_map()) {
-            prefetch<false>(i);
-        }
+    auto lazy_emplace(State& state, size_t i, F&& creator, FF&& 
creator_for_null_key) {
+        prefetch<false>(i);
         return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i], 
creator,
                                       creator_for_null_key);
     }
@@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 };
 
+/// Sub-table group indices for StringHashTable batch operations.
+/// StringHashTable dispatches keys to 6 sub-tables by string length:
+///   group 0: empty strings (size == 0) → m0
+///   group 1: size <= 2 → m1
+///   group 2: size <= 4 → m2
+///   group 3: size <= 8 → m3
+///   group 4: size <= 16 → m4
+///   group 5: size > 16 or trailing zero → ms
+/// By pre-grouping row indices, we can process each sub-table in a batch,
+/// achieving better cache locality and enabling prefetch within each group.
+struct StringKeySubTableGroups {
+    static constexpr int NUM_GROUPS = 6;
+    // Row indices for each sub-table group
+    DorisVector<uint32_t> group_row_indices[NUM_GROUPS];
+
+    void build(const StringRef* keys, uint32_t num_rows) {
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].clear();
+        }
+        // First pass: count sizes for each group to reserve memory
+        uint32_t counts[NUM_GROUPS] = {};
+        for (uint32_t i = 0; i < num_rows; i++) {
+            counts[get_group(keys[i])]++;
+        }
+        for (int g = 0; g < NUM_GROUPS; g++) {
+            group_row_indices[g].reserve(counts[g]);
+        }
+        // Second pass: fill group indices
+        for (uint32_t i = 0; i < num_rows; i++) {
+            group_row_indices[get_group(keys[i])].push_back(i);
+        }
+    }
+
+    static ALWAYS_INLINE int get_group(const StringRef& key) {
+        const size_t sz = key.size;
+        if (sz == 0) {
+            return 0;
+        }
+        if (key.data[sz - 1] == 0) {
+            // Trailing zero: goes to the generic long-string table (ms)
+            return 5;
+        }
+        if (sz <= 2) return 1;
+        if (sz <= 4) return 2;
+        if (sz <= 8) return 3;
+        if (sz <= 16) return 4;
+        return 5;
+    }
+};
+
 template <typename TData>
 struct MethodStringNoCache : public MethodBase<TData> {
     using Base = MethodBase<TData>;
@@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
     // refresh each time probe
     DorisVector<StringRef> _stored_keys;
 
+    // Sub-table groups for batch operations (only used for non-join 
aggregation path)
+    StringKeySubTableGroups _sub_table_groups;
+
     size_t serialized_keys_size(bool is_build) const override {
         return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
                         : (_stored_keys.size() * sizeof(StringRef));
@@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
             Base::init_join_bucket_num(num_rows, bucket_size, null_map);
         } else {
             Base::init_hash_values(num_rows, null_map);
+            // Build sub-table groups for batch emplace/find (only for 
aggregation, not join)
+            if constexpr (Base::is_string_hash_map()) {
+                _sub_table_groups.build(Base::keys, num_rows);
+            }
         }
     }
 
@@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> {
         key_columns[0]->reserve(num_rows);
         key_columns[0]->insert_many_strings(input_keys.data(), num_rows);
     }
+
+    const StringKeySubTableGroups& get_sub_table_groups() const { return 
_sub_table_groups; }
+};
+
+/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap.
+template <typename HashMap>
+struct IsNullableStringHashMap : std::false_type {};
+
+template <typename Mapped, typename Allocator>
+struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped, 
Allocator>>> : std::true_type {
 };
 
+/// Helper: get the underlying StringHashTable from a hash table (handles 
DataWithNullKey wrapper).
+template <typename HashMap>
+auto& get_string_hash_table(HashMap& data) {
+    return data;
+}
+
+/// Compile-time key conversion for each sub-table group.
+/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly.
+/// Returns the converted key for the given group.
+/// For groups 0 and 5, the key is returned as a non-const copy 
(lazy_emplace_if_zero takes Key&).
+template <int GroupIdx>
+auto convert_key_for_submap(const StringRef& origin) {
+    if constexpr (GroupIdx == 0) {
+        return StringRef(origin); // copy — m0 needs non-const Key&
+    } else if constexpr (GroupIdx == 1) {
+        return to_string_key<StringKey2>(origin);
+    } else if constexpr (GroupIdx == 2) {
+        return to_string_key<StringKey4>(origin);
+    } else if constexpr (GroupIdx == 3) {
+        return to_string_key<StringKey8>(origin);
+    } else if constexpr (GroupIdx == 4) {
+        return to_string_key<StringKey16>(origin);
+    } else {
+        return StringRef(origin); // copy — ms uses StringRef as Key
+    }
+}
+
+/// Hash value to use for a given group. Group 0 (empty string) always uses 
hash=0.
+template <int GroupIdx, typename HashValues>
+size_t hash_for_group(const HashValues& hash_values, uint32_t row) {
+    if constexpr (GroupIdx == 0) {
+        return 0;
+    } else {
+        return hash_values[row];
+    }
+}
+
+/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at 
most 1 element)
+/// does not benefit from prefetch.
+template <int GroupIdx>
+static constexpr bool group_needs_prefetch = (GroupIdx != 0);
+
+/// Process one sub-table group for emplace with result_handler.
+/// Handles nullable null-key check, prefetch, key conversion, and emplace.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler, 
typename ResultHandler>
+void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t 
count,
+                            HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                            F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler,
+                            ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)(
+                            hash_table.template get_null_key_data<Mapped>());
+                }
+                result_handler(row, hash_table.template 
get_null_key_data<Mapped>());
+                continue;
+            }
+        }
+        auto origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            // Groups 0,5: key and origin are the same StringRef
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            // Groups 1-4: converted_key differs from origin
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+        result_handler(row, result->get_second());
+    }
+}
+
+/// Process one sub-table group for emplace without result_handler (void 
version).
+/// pre_handler(row) is called before each emplace.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename F, typename FF, typename PreHandler>
+void process_submap_emplace_void(Submap& submap, const uint32_t* indices, 
size_t count,
+                                 HashMethodType& agg_method, State& state, 
HashMap& hash_table,
+                                 F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<false>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        pre_handler(row);
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                bool has_null_key = hash_table.has_null_key_data();
+                hash_table.has_null_key_data() = true;
+                if (!has_null_key) {
+                    std::forward<FF>(creator_for_null_key)();
+                }
+                continue;
+            }
+        }
+        auto origin = agg_method.keys[row];
+        auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+        typename Submap::LookupResult result;
+        if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+            submap.lazy_emplace_with_origin(converted_key, converted_key, 
result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        } else {
+            submap.lazy_emplace_with_origin(converted_key, origin, result,
+                                            
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+                                            std::forward<F>(creator));
+        }
+    }
+}
+
+/// Process one sub-table group for find with result_handler.
+template <int GroupIdx, bool is_nullable, typename Submap, typename 
HashMethodType, typename State,
+          typename HashMap, typename ResultHandler>
+void process_submap_find(Submap& submap, const uint32_t* indices, size_t count,
+                         HashMethodType& agg_method, State& state, HashMap& 
hash_table,
+                         ResultHandler&& result_handler) {
+    using Mapped = typename HashMethodType::Mapped;
+    using FindResult = typename 
ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;
+    for (size_t j = 0; j < count; j++) {
+        if constexpr (group_needs_prefetch<GroupIdx>) {
+            if (j + HASH_MAP_PREFETCH_DIST < count) {
+                submap.template prefetch<true>(
+                        agg_method.hash_values[indices[j + 
HASH_MAP_PREFETCH_DIST]]);
+            }
+        }
+        uint32_t row = indices[j];
+        if constexpr (is_nullable) {
+            if (state.key_column->is_null_at(row)) {
+                if (hash_table.has_null_key_data()) {
+                    FindResult res(&hash_table.template 
get_null_key_data<Mapped>(), true);
+                    result_handler(row, res);
+                } else {
+                    FindResult res(nullptr, false);
+                    result_handler(row, res);
+                }
+                continue;
+            }
+        }
+        auto converted_key = 
convert_key_for_submap<GroupIdx>(agg_method.keys[row]);
+        auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row);
+        auto it = submap.find(converted_key, hash);
+        if (it) {
+            FindResult res(&it->get_second(), true);
+            result_handler(row, res);
+        } else {
+            FindResult res(nullptr, false);
+            result_handler(row, res);
+        }
+    }
+}
+
+/// Batch emplace helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+/// pre_handler(row) is called before each emplace, allowing callers to set 
per-row state
+/// (e.g., current row index used inside creator lambdas).
+/// result_handler(row_index, mapped) is called after each emplace.
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler,
+          typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, PreHandler&& pre_handler,
+                        ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace<G, is_nullable>(
+                        submap, indices.data(), indices.size(), agg_method, 
state, hash_table,
+                        creator, creator_for_null_key, pre_handler, 
result_handler);
+            }
+        });
+    } else {
+        // Standard per-row loop with ahead prefetch
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table, 
i, agg_method.keys[i],
+                                                      
agg_method.hash_values[i], creator,
+                                                      creator_for_null_key));
+        }
+    }
+}
+
+/// Convenience overload without pre_handler (uses no-op).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t 
num_rows, F&& creator,
+                        FF&& creator_for_null_key, ResultHandler&& 
result_handler) {
+    lazy_emplace_batch(
+            agg_method, state, num_rows, std::forward<F>(creator),
+            std::forward<FF>(creator_for_null_key), [](uint32_t) {},
+            std::forward<ResultHandler>(result_handler));
+}
+
+/// Batch emplace helper (void version): like lazy_emplace_batch but ignores 
the return value.
+/// pre_handler(row) is called before each emplace, allowing callers to update 
captured state
+/// (e.g., the current row index used inside creator lambdas).
+template <typename HashMethodType, typename State, typename F, typename FF, 
typename PreHandler>
+void lazy_emplace_batch_void(HashMethodType& agg_method, State& state, 
uint32_t num_rows,
+                             F&& creator, FF&& creator_for_null_key, 
PreHandler&& pre_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_emplace_void<G, is_nullable>(submap, 
indices.data(), indices.size(),
+                                                            agg_method, state, 
hash_table, creator,
+                                                            
creator_for_null_key, pre_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<false>(i);
+            pre_handler(i);
+            state.lazy_emplace_key(*agg_method.hash_table, i, 
agg_method.keys[i],
+                                   agg_method.hash_values[i], creator, 
creator_for_null_key);
+        }
+    }
+}
+
+/// Batch find helper: for StringHashMap, directly accesses sub-tables 
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+template <typename HashMethodType, typename State, typename ResultHandler>
+void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows,
+                ResultHandler&& result_handler) {
+    if constexpr (HashMethodType::is_string_hash_map()) {
+        using HashMap = typename HashMethodType::HashMapType;
+        constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+        auto& hash_table = *agg_method.hash_table;
+        auto& sht = get_string_hash_table(hash_table);
+        const auto& groups = agg_method.get_sub_table_groups();
+
+        sht.visit_submaps([&](auto group_idx, auto& submap) {
+            constexpr int G = decltype(group_idx)::value;
+            const auto& indices = groups.group_row_indices[G];
+            if (!indices.empty()) {
+                process_submap_find<G, is_nullable>(submap, indices.data(), 
indices.size(),
+                                                    agg_method, state, 
hash_table, result_handler);
+            }
+        });
+    } else {
+        for (uint32_t i = 0; i < num_rows; ++i) {
+            agg_method.template prefetch<true>(i);
+            auto find_result = state.find_key_with_hash(
+                    *agg_method.hash_table, i, agg_method.keys[i], 
agg_method.hash_values[i]);
+            result_handler(i, find_result);
+        }
+    }
+}
+
 /// For the case where there is one numeric key.
 /// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
 template <typename FieldType, typename TData>
diff --git a/be/src/vec/common/hash_table/string_hash_table.h 
b/be/src/vec/common/hash_table/string_hash_table.h
index ebde230f1fd..b7f8d707b45 100644
--- a/be/src/vec/common/hash_table/string_hash_table.h
+++ b/be/src/vec/common/hash_table/string_hash_table.h
@@ -673,6 +673,28 @@ public:
     const_iterator cend() const { return end(); }
     iterator end() { return iterator(this, true); }
 
+    /// Public accessors for sub-tables, enabling direct batch operations
+    /// that bypass dispatch() for better performance (no per-row branching).
+    T0& get_submap_m0() { return m0; }
+    T1& get_submap_m1() { return m1; }
+    T2& get_submap_m2() { return m2; }
+    T3& get_submap_m3() { return m3; }
+    T4& get_submap_m4() { return m4; }
+    Ts& get_submap_ms() { return ms; }
+
+    /// Visit each (group_index, submap) pair with a generic callable.
+    /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&)
+    /// The integral_constant enables compile-time group dispatch in the 
lambda.
+    template <typename Func>
+    ALWAYS_INLINE void visit_submaps(Func&& func) {
+        func(std::integral_constant<int, 0> {}, m0);
+        func(std::integral_constant<int, 1> {}, m1);
+        func(std::integral_constant<int, 2> {}, m2);
+        func(std::integral_constant<int, 3> {}, m3);
+        func(std::integral_constant<int, 4> {}, m4);
+        func(std::integral_constant<int, 5> {}, ms);
+    }
+
     bool add_elem_size_overflow(size_t add_size) const {
         return m1.add_elem_size_overflow(add_size) || 
m2.add_elem_size_overflow(add_size) ||
                m3.add_elem_size_overflow(add_size) || 
m4.add_elem_size_overflow(add_size) ||
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f6c929b6526..61d1aefb020 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -814,6 +814,7 @@ public class Coordinator implements CoordInterface {
             // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does not
             // send data until ExchangeNode is ready to receive.
             boolean twoPhaseExecution = fragments.size() > 1;
+            boolean isSingleBackend = addressToBackendID.size() == 1;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
@@ -844,6 +845,7 @@ public class Coordinator implements CoordInterface {
                     
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
                     
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
                     
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+                    
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
                     
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
 
                     
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), 
pipelineExecContext);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index 1c60b218777..945cf66bbfe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
     public final Supplier<Set<TUniqueId>> instanceIds = 
Suppliers.memoize(this::getInstanceIds);
     public final Supplier<Map<TNetworkAddress, Long>> backends = 
Suppliers.memoize(this::getBackends);
     public final Supplier<Integer> scanRangeNum = 
Suppliers.memoize(this::getScanRangeNum);
+    public final Supplier<Boolean> isSingleBackendQuery = 
Suppliers.memoize(this::computeIsSingleBackendQuery);
     public final Supplier<TNetworkAddress> directConnectFrontendAddress
             = Suppliers.memoize(this::computeDirectConnectCoordinator);
 
@@ -447,6 +448,10 @@ public class CoordinatorContext {
         return scanRangeNum;
     }
 
+    private boolean computeIsSingleBackendQuery() {
+        return backends.get().size() == 1;
+    }
+
     private int computeScanRangeNumByScanRange(TScanRangeParams param) {
         int scanRangeNum = 0;
         TScanRange scanRange = param.getScanRange();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index d6e4b64dc96..13391013f52 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -90,6 +90,11 @@ public class ThriftPlansBuilder {
 
         List<PipelineDistributedPlan> distributedPlans = 
coordinatorContext.distributedPlans;
 
+        // Determine whether this query is assigned to a single backend and 
propagate it to
+        // TQueryOptions so that BE can apply more appropriate optimization 
strategies (e.g.
+        // streaming aggregation hash table thresholds).
+        
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
         // we should set runtime predicate first, then we can use heap sort 
and to thrift
         setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index cddef80521c..3217eeacb06 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -431,6 +431,11 @@ struct TQueryOptions {
 
   195: optional bool enable_left_semi_direct_return_opt;
 
+  // Whether all fragments of this query are assigned to a single backend.
+  // When true, the streaming aggregation operator can use more aggressive
+  // hash table expansion thresholds since all data is local.
+  202: optional bool single_backend_query = false;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to