This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 67a5ca1b6be [Chore](pick) pick changes from PR #61104 and PR #60941
(#61303)
67a5ca1b6be is described below
commit 67a5ca1b6bebcf0a890132b78b9271b3c00062ab
Author: Pxl <[email protected]>
AuthorDate: Fri Mar 13 21:27:52 2026 +0800
[Chore](pick) pick changes from PR #61104 and PR #60941 (#61303)
pick changes from PR #61104 and PR #60941
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 56 ++--
.../pipeline/exec/aggregation_source_operator.cpp | 7 +-
.../distinct_streaming_aggregation_operator.cpp | 44 +--
.../exec/distinct_streaming_aggregation_operator.h | 3 +-
be/src/pipeline/exec/streaming_agg_min_reduction.h | 76 +++++
.../exec/streaming_aggregation_operator.cpp | 64 +---
.../pipeline/exec/streaming_aggregation_operator.h | 2 +
be/src/runtime/fragment_mgr.cpp | 5 +
be/src/runtime/query_context.h | 8 +
be/src/vec/common/hash_table/hash_map_context.h | 369 ++++++++++++++++++++-
be/src/vec/common/hash_table/string_hash_table.h | 22 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +
.../org/apache/doris/qe/CoordinatorContext.java | 5 +
.../doris/qe/runtime/ThriftPlansBuilder.java | 5 +
gensrc/thrift/PaloInternalService.thrift | 5 +
15 files changed, 547 insertions(+), 126 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index f0f74bc2592..aca5c500d19 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -575,10 +575,9 @@ void
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state, i,
creator,
-
creator_for_null_key);
- }
+ vectorized::lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
}},
@@ -660,10 +659,10 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state, i,
creator,
-
creator_for_null_key);
- }
+ vectorized::lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row) { i = row; },
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
return true;
}
@@ -675,27 +674,26 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr*
places,
vectorized::ColumnRawPtrs&
key_columns,
uint32_t num_rows) {
- std::visit(vectorized::Overload {[&](std::monostate& arg) -> void {
- throw
doris::Exception(ErrorCode::INTERNAL_ERROR,
- "uninited hash
table");
- },
- [&](auto& agg_method) -> void {
- using HashMethodType =
std::decay_t<decltype(agg_method)>;
- using AggState = typename
HashMethodType::State;
- AggState state(key_columns);
-
agg_method.init_serialized_keys(key_columns, num_rows);
-
- /// For all rows.
- for (size_t i = 0; i < num_rows; ++i)
{
- auto find_result =
agg_method.find(state, i);
-
- if (find_result.is_found()) {
- places[i] =
find_result.get_mapped();
- } else {
- places[i] = nullptr;
- }
- }
- }},
+ std::visit(vectorized::Overload {
+ [&](std::monostate& arg) -> void {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ },
+ [&](auto& agg_method) -> void {
+ using HashMethodType =
std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns);
+ agg_method.init_serialized_keys(key_columns,
num_rows);
+
+ /// For all rows.
+ vectorized::find_batch(agg_method, state, num_rows,
+ [&](uint32_t row, auto&
find_result) {
+ if
(find_result.is_found()) {
+ places[row] =
find_result.get_mapped();
+ } else {
+ places[row] =
nullptr;
+ }
+ });
+ }},
_agg_data->method_variant);
}
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index b8ecd137016..a8062b3c2f5 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -590,10 +590,9 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state, i,
creator,
-
creator_for_null_key);
- }
+ vectorized::lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row, auto& mapped) { places[row]
= mapped; });
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 972fc9ba923..3fed13adfbc 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -23,6 +23,7 @@
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "pipeline/exec/streaming_agg_min_reduction.h"
#include "vec/exprs/vectorized_agg_fn.h"
namespace doris {
@@ -32,29 +33,6 @@ class RuntimeState;
namespace doris::pipeline {
#include "common/compile_check_begin.h"
-struct StreamingHtMinReductionEntry {
- // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
- // bytes is greater than this threshold.
- int min_ht_mem;
- // The minimum reduction factor to expand the hash tables.
- double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
- // Expand up to L2 cache always.
- {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
- // Expand into L3 cache if we look like we're getting some reduction.
- // At present, The L2 cache is generally 1024k or more
- {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
- // Expand into main memory if we're getting a significant reduction.
- // The L3 cache is generally 16MB or more
- {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
- sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
OperatorXBase*
parent)
@@ -62,7 +40,8 @@
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
batch_size(state->batch_size()),
_agg_data(std::make_unique<DistinctDataVariants>()),
_child_block(vectorized::Block::create_unique()),
- _aggregated_block(vectorized::Block::create_unique()) {}
+ _aggregated_block(vectorized::Block::create_unique()),
+
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -113,10 +92,14 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
return true;
}
+ const auto* reduction =
+ _is_single_backend ?
doris::SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
doris::STREAMING_HT_MIN_REDUCTION;
+
// Find the appropriate reduction factor in our table
for the current hash table sizes.
int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+ while (cache_level + 1 <
doris::STREAMING_HT_MIN_REDUCTION_SIZE &&
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
++cache_level;
}
@@ -145,8 +128,7 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
// double estimated_reduction = aggregated_input_rows
>= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+ double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
@@ -310,9 +292,9 @@ void
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
auto creator_for_null_key = [&]() {
distinct_row.push_back(row); };
SCOPED_TIMER(_hash_table_emplace_timer);
- for (; row < num_rows; ++row) {
- agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
- }
+ vectorized::lazy_emplace_batch_void(agg_method, state,
num_rows, creator,
+
creator_for_null_key,
+ [&](uint32_t r) {
row = r; });
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
}},
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d4f7a08136b..8b7898857d6 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -87,6 +87,7 @@ private:
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+ bool _is_single_backend = false;
};
class DistinctStreamingAggOperatorX final
@@ -157,4 +158,4 @@ private:
} // namespace pipeline
} // namespace doris
-#include "common/compile_check_end.h"
\ No newline at end of file
+#include "common/compile_check_end.h"
diff --git a/be/src/pipeline/exec/streaming_agg_min_reduction.h
b/be/src/pipeline/exec/streaming_agg_min_reduction.h
new file mode 100644
index 00000000000..df2acc66ec8
--- /dev/null
+++ b/be/src/pipeline/exec/streaming_agg_min_reduction.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace doris {
+
+/// The minimum reduction factor (input rows divided by output rows) to grow
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the
bucket
+/// arrays will fit in a cache level. Intuitively, we don't want the working
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the
input
+/// enough to outweigh the increased memory latency we'll incur for each hash
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of
the
+/// final reduction. It may be biased either way depending on the ordering of
the
+/// input. If the input order is random, we will underestimate the final
reduction
+/// factor because the probability of a row having the same key as a previous
row
+/// increases as more input is processed. If the input order is correlated
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final
reduction
+/// using the planner's estimated input cardinality and the assumption that
input
+/// is in a random order. This means that we assume that the reduction factor
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+ // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
+ // bytes is greater than this threshold.
+ int min_ht_mem;
+ // The minimum reduction factor to expand the hash tables.
+ double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the
cache size
+// of the machine that we're running on.
+static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 256k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 256k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 4.0},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 5.0},
+};
+
+static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
+ sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+} // namespace doris
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 6c0c412f819..9ff1461d0d6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -25,6 +25,7 @@
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "pipeline/exec/operator.h"
+#include "pipeline/exec/streaming_agg_min_reduction.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vslot_ref.h"
@@ -35,54 +36,13 @@ class RuntimeState;
} // namespace doris
namespace doris::pipeline {
-/// The minimum reduction factor (input rows divided by output rows) to grow
hash tables
-/// in a streaming preaggregation, given that the hash tables are currently
the given
-/// size or above. The sizes roughly correspond to hash table sizes where the
bucket
-/// arrays will fit in a cache level. Intuitively, we don't want the working
set of the
-/// aggregation to expand to the next level of cache unless we're reducing the
input
-/// enough to outweigh the increased memory latency we'll incur for each hash
table
-/// lookup.
-///
-/// Note that the current reduction achieved is not always a good estimate of
the
-/// final reduction. It may be biased either way depending on the ordering of
the
-/// input. If the input order is random, we will underestimate the final
reduction
-/// factor because the probability of a row having the same key as a previous
row
-/// increases as more input is processed. If the input order is correlated
with the
-/// key, skew may bias the estimate. If high cardinality keys appear first, we
-/// may overestimate and if low cardinality keys appear first, we
underestimate.
-/// To estimate the eventual reduction achieved, we estimate the final
reduction
-/// using the planner's estimated input cardinality and the assumption that
input
-/// is in a random order. This means that we assume that the reduction factor
will
-/// increase over time.
-struct StreamingHtMinReductionEntry {
- // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
- // bytes is greater than this threshold.
- int min_ht_mem;
- // The minimum reduction factor to expand the hash tables.
- double streaming_ht_min_reduction;
-};
-
-// TODO: experimentally tune these values and also programmatically get the
cache size
-// of the machine that we're running on.
-static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
- // Expand up to L2 cache always.
- {0, 0.0},
- // Expand into L3 cache if we look like we're getting some reduction.
- // At present, The L2 cache is generally 1024k or more
- {1024 * 1024, 1.1},
- // Expand into main memory if we're getting a significant reduction.
- // The L3 cache is generally 16MB or more
- {16 * 1024 * 1024, 2.0},
-};
-
-static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
- sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
_agg_data(std::make_unique<AggregatedDataVariants>()),
_child_block(vectorized::Block::create_unique()),
- _pre_aggregated_block(vectorized::Block::create_unique()) {}
+ _pre_aggregated_block(vectorized::Block::create_unique()),
+
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info)
{
RETURN_IF_ERROR(Base::init(state, info));
@@ -236,10 +196,14 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
return true;
}
+ const auto* reduction =
+ _is_single_backend ?
doris::SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
doris::STREAMING_HT_MIN_REDUCTION;
+
// Find the appropriate reduction factor in our table
for the current hash table sizes.
int cache_level = 0;
- while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+ while (cache_level + 1 <
doris::STREAMING_HT_MIN_REDUCTION_SIZE &&
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
++cache_level;
}
@@ -268,8 +232,7 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
// double estimated_reduction = aggregated_input_rows
>= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+ double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
@@ -806,10 +769,9 @@ void
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP
};
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- places[i] = *agg_method.lazy_emplace(state, i,
creator,
-
creator_for_null_key);
- }
+ vectorized::lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t row, auto& mapped) {
places[row] = mapped; });
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
}},
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index a9e8cc54ba8..3632bea9430 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -199,6 +199,8 @@ private:
}},
_agg_data->method_variant);
}
+
+ bool _is_single_backend = false;
};
class StreamingAggOperatorX MOCK_REMOVE(final) : public
StatefulOperatorX<StreamingAggLocalState> {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6bf60dbdfc5..6cd0ee1c7c1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -865,6 +865,11 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source,
query_ctx));
SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
+ // Set single_backend_query before prepare() so that pipeline local states
+ // (e.g. StreamingAggLocalState) can read the correct value in their
constructors.
+ query_ctx->set_single_backend_query(params.__isset.query_options &&
+
params.query_options.__isset.single_backend_query &&
+
params.query_options.single_backend_query);
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 6ec5664f8c0..7e086635c9d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -107,6 +107,12 @@ public:
return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
}
+ bool is_single_backend_query() const { return _is_single_backend_query; }
+
+ void set_single_backend_query(bool is_single_backend_query) {
+ _is_single_backend_query = is_single_backend_query;
+ }
+
int64_t get_remaining_query_time_seconds() const {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
@@ -374,6 +380,8 @@ private:
std::string _load_error_url;
std::string _first_error_msg;
+ bool _is_single_backend_query = false;
+
// file cache context holders
std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr>
_query_context_holders;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h
b/be/src/vec/common/hash_table/hash_map_context.h
index e86381ba3b8..1854209b282 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -112,7 +112,7 @@ struct MethodBaseInner {
}
template <bool read>
- ALWAYS_INLINE void prefetch(size_t i) {
+ void prefetch(size_t i) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
hash_table->template prefetch<read>(keys[i +
HASH_MAP_PREFETCH_DIST],
hash_values[i +
HASH_MAP_PREFETCH_DIST]);
@@ -120,19 +120,14 @@ struct MethodBaseInner {
}
template <typename State>
- ALWAYS_INLINE auto find(State& state, size_t i) {
- if constexpr (!is_string_hash_map()) {
- prefetch<true>(i);
- }
+ auto find(State& state, size_t i) {
+ prefetch<true>(i);
return state.find_key_with_hash(*hash_table, i, keys[i],
hash_values[i]);
}
template <typename State, typename F, typename FF>
- ALWAYS_INLINE auto lazy_emplace(State& state, size_t i, F&& creator,
- FF&& creator_for_null_key) {
- if constexpr (!is_string_hash_map()) {
- prefetch<false>(i);
- }
+ auto lazy_emplace(State& state, size_t i, F&& creator, FF&&
creator_for_null_key) {
+ prefetch<false>(i);
return state.lazy_emplace_key(*hash_table, i, keys[i], hash_values[i],
creator,
creator_for_null_key);
}
@@ -292,6 +287,56 @@ struct MethodSerialized : public MethodBase<TData> {
}
};
+/// Sub-table group indices for StringHashTable batch operations.
+/// StringHashTable dispatches keys to 6 sub-tables by string length:
+/// group 0: empty strings (size == 0) → m0
+/// group 1: size <= 2 → m1
+/// group 2: size <= 4 → m2
+/// group 3: size <= 8 → m3
+/// group 4: size <= 16 → m4
+/// group 5: size > 16 or trailing zero → ms
+/// By pre-grouping row indices, we can process each sub-table in a batch,
+/// achieving better cache locality and enabling prefetch within each group.
+struct StringKeySubTableGroups {
+ static constexpr int NUM_GROUPS = 6;
+ // Row indices for each sub-table group
+ DorisVector<uint32_t> group_row_indices[NUM_GROUPS];
+
+ void build(const StringRef* keys, uint32_t num_rows) {
+ for (int g = 0; g < NUM_GROUPS; g++) {
+ group_row_indices[g].clear();
+ }
+ // First pass: count sizes for each group to reserve memory
+ uint32_t counts[NUM_GROUPS] = {};
+ for (uint32_t i = 0; i < num_rows; i++) {
+ counts[get_group(keys[i])]++;
+ }
+ for (int g = 0; g < NUM_GROUPS; g++) {
+ group_row_indices[g].reserve(counts[g]);
+ }
+ // Second pass: fill group indices
+ for (uint32_t i = 0; i < num_rows; i++) {
+ group_row_indices[get_group(keys[i])].push_back(i);
+ }
+ }
+
+ static ALWAYS_INLINE int get_group(const StringRef& key) {
+ const size_t sz = key.size;
+ if (sz == 0) {
+ return 0;
+ }
+ if (key.data[sz - 1] == 0) {
+ // Trailing zero: goes to the generic long-string table (ms)
+ return 5;
+ }
+ if (sz <= 2) return 1;
+ if (sz <= 4) return 2;
+ if (sz <= 8) return 3;
+ if (sz <= 16) return 4;
+ return 5;
+ }
+};
+
template <typename TData>
struct MethodStringNoCache : public MethodBase<TData> {
using Base = MethodBase<TData>;
@@ -305,6 +350,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
// refresh each time probe
DorisVector<StringRef> _stored_keys;
+ // Sub-table groups for batch operations (only used for non-join
aggregation path)
+ StringKeySubTableGroups _sub_table_groups;
+
size_t serialized_keys_size(bool is_build) const override {
return is_build ? (_build_stored_keys.size() * sizeof(StringRef))
: (_stored_keys.size() * sizeof(StringRef));
@@ -357,6 +405,10 @@ struct MethodStringNoCache : public MethodBase<TData> {
Base::init_join_bucket_num(num_rows, bucket_size, null_map);
} else {
Base::init_hash_values(num_rows, null_map);
+ // Build sub-table groups for batch emplace/find (only for
aggregation, not join)
+ if constexpr (Base::is_string_hash_map()) {
+ _sub_table_groups.build(Base::keys, num_rows);
+ }
}
}
@@ -365,8 +417,305 @@ struct MethodStringNoCache : public MethodBase<TData> {
key_columns[0]->reserve(num_rows);
key_columns[0]->insert_many_strings(input_keys.data(), num_rows);
}
+
+ const StringKeySubTableGroups& get_sub_table_groups() const { return
_sub_table_groups; }
+};
+
+/// Helper: detect whether HashMap is a nullable-wrapped StringHashMap.
+template <typename HashMap>
+struct IsNullableStringHashMap : std::false_type {};
+
+template <typename Mapped, typename Allocator>
+struct IsNullableStringHashMap<DataWithNullKey<StringHashMap<Mapped,
Allocator>>> : std::true_type {
};
+/// Helper: get the underlying StringHashTable from a hash table (handles
DataWithNullKey wrapper).
+template <typename HashMap>
+auto& get_string_hash_table(HashMap& data) {
+ return data;
+}
+
+/// Compile-time key conversion for each sub-table group.
+/// Groups 1-4 use to_string_key<T>(); groups 0 and 5 use StringRef directly.
+/// Returns the converted key for the given group.
+/// For groups 0 and 5, the key is returned as a non-const copy
(lazy_emplace_if_zero takes Key&).
+template <int GroupIdx>
+auto convert_key_for_submap(const StringRef& origin) {
+ if constexpr (GroupIdx == 0) {
+ return StringRef(origin); // copy — m0 needs non-const Key&
+ } else if constexpr (GroupIdx == 1) {
+ return to_string_key<StringKey2>(origin);
+ } else if constexpr (GroupIdx == 2) {
+ return to_string_key<StringKey4>(origin);
+ } else if constexpr (GroupIdx == 3) {
+ return to_string_key<StringKey8>(origin);
+ } else if constexpr (GroupIdx == 4) {
+ return to_string_key<StringKey16>(origin);
+ } else {
+ return StringRef(origin); // copy — ms uses StringRef as Key
+ }
+}
+
+/// Hash value to use for a given group. Group 0 (empty string) always uses
hash=0.
+template <int GroupIdx, typename HashValues>
+size_t hash_for_group(const HashValues& hash_values, uint32_t row) {
+ if constexpr (GroupIdx == 0) {
+ return 0;
+ } else {
+ return hash_values[row];
+ }
+}
+
+/// Whether prefetch is useful for a group. Group 0 (StringHashTableEmpty, at
most 1 element)
+/// does not benefit from prefetch.
+template <int GroupIdx>
+static constexpr bool group_needs_prefetch = (GroupIdx != 0);
+
+/// Process one sub-table group for emplace with result_handler.
+/// Handles nullable null-key check, prefetch, key conversion, and emplace.
+/// pre_handler(row) is called before each emplace, allowing callers to set
per-row state
+/// (e.g., current row index used inside creator lambdas).
+template <int GroupIdx, bool is_nullable, typename Submap, typename
HashMethodType, typename State,
+ typename HashMap, typename F, typename FF, typename PreHandler,
typename ResultHandler>
+void process_submap_emplace(Submap& submap, const uint32_t* indices, size_t
count,
+ HashMethodType& agg_method, State& state, HashMap&
hash_table,
+ F&& creator, FF&& creator_for_null_key,
PreHandler&& pre_handler,
+ ResultHandler&& result_handler) {
+ using Mapped = typename HashMethodType::Mapped;
+ for (size_t j = 0; j < count; j++) {
+ if constexpr (group_needs_prefetch<GroupIdx>) {
+ if (j + HASH_MAP_PREFETCH_DIST < count) {
+ submap.template prefetch<false>(
+ agg_method.hash_values[indices[j +
HASH_MAP_PREFETCH_DIST]]);
+ }
+ }
+ uint32_t row = indices[j];
+ pre_handler(row);
+ if constexpr (is_nullable) {
+ if (state.key_column->is_null_at(row)) {
+ bool has_null_key = hash_table.has_null_key_data();
+ hash_table.has_null_key_data() = true;
+ if (!has_null_key) {
+ std::forward<FF>(creator_for_null_key)(
+ hash_table.template get_null_key_data<Mapped>());
+ }
+ result_handler(row, hash_table.template
get_null_key_data<Mapped>());
+ continue;
+ }
+ }
+ auto origin = agg_method.keys[row];
+ auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+ typename Submap::LookupResult result;
+ if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+ // Groups 0,5: key and origin are the same StringRef
+ submap.lazy_emplace_with_origin(converted_key, converted_key,
result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ } else {
+ // Groups 1-4: converted_key differs from origin
+ submap.lazy_emplace_with_origin(converted_key, origin, result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ }
+ result_handler(row, result->get_second());
+ }
+}
+
+/// Process one sub-table group for emplace without result_handler (void
version).
+/// pre_handler(row) is called before each emplace.
+template <int GroupIdx, bool is_nullable, typename Submap, typename
HashMethodType, typename State,
+ typename HashMap, typename F, typename FF, typename PreHandler>
+void process_submap_emplace_void(Submap& submap, const uint32_t* indices,
size_t count,
+ HashMethodType& agg_method, State& state,
HashMap& hash_table,
+ F&& creator, FF&& creator_for_null_key,
PreHandler&& pre_handler) {
+ for (size_t j = 0; j < count; j++) {
+ if constexpr (group_needs_prefetch<GroupIdx>) {
+ if (j + HASH_MAP_PREFETCH_DIST < count) {
+ submap.template prefetch<false>(
+ agg_method.hash_values[indices[j +
HASH_MAP_PREFETCH_DIST]]);
+ }
+ }
+ uint32_t row = indices[j];
+ pre_handler(row);
+ if constexpr (is_nullable) {
+ if (state.key_column->is_null_at(row)) {
+ bool has_null_key = hash_table.has_null_key_data();
+ hash_table.has_null_key_data() = true;
+ if (!has_null_key) {
+ std::forward<FF>(creator_for_null_key)();
+ }
+ continue;
+ }
+ }
+ auto origin = agg_method.keys[row];
+ auto converted_key = convert_key_for_submap<GroupIdx>(origin);
+ typename Submap::LookupResult result;
+ if constexpr (GroupIdx == 0 || GroupIdx == 5) {
+ submap.lazy_emplace_with_origin(converted_key, converted_key,
result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ } else {
+ submap.lazy_emplace_with_origin(converted_key, origin, result,
+
hash_for_group<GroupIdx>(agg_method.hash_values, row),
+ std::forward<F>(creator));
+ }
+ }
+}
+
+/// Process one sub-table group for find with result_handler.
+template <int GroupIdx, bool is_nullable, typename Submap, typename
HashMethodType, typename State,
+ typename HashMap, typename ResultHandler>
+void process_submap_find(Submap& submap, const uint32_t* indices, size_t count,
+ HashMethodType& agg_method, State& state, HashMap&
hash_table,
+ ResultHandler&& result_handler) {
+ using Mapped = typename HashMethodType::Mapped;
+ using FindResult = typename
ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped>;
+ for (size_t j = 0; j < count; j++) {
+ if constexpr (group_needs_prefetch<GroupIdx>) {
+ if (j + HASH_MAP_PREFETCH_DIST < count) {
+ submap.template prefetch<true>(
+ agg_method.hash_values[indices[j +
HASH_MAP_PREFETCH_DIST]]);
+ }
+ }
+ uint32_t row = indices[j];
+ if constexpr (is_nullable) {
+ if (state.key_column->is_null_at(row)) {
+ if (hash_table.has_null_key_data()) {
+ FindResult res(&hash_table.template
get_null_key_data<Mapped>(), true);
+ result_handler(row, res);
+ } else {
+ FindResult res(nullptr, false);
+ result_handler(row, res);
+ }
+ continue;
+ }
+ }
+ auto converted_key =
convert_key_for_submap<GroupIdx>(agg_method.keys[row]);
+ auto hash = hash_for_group<GroupIdx>(agg_method.hash_values, row);
+ auto it = submap.find(converted_key, hash);
+ if (it) {
+ FindResult res(&it->get_second(), true);
+ result_handler(row, res);
+ } else {
+ FindResult res(nullptr, false);
+ result_handler(row, res);
+ }
+ }
+}
+
+/// Batch emplace helper: for StringHashMap, directly accesses sub-tables
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+/// pre_handler(row) is called before each emplace, allowing callers to set
per-row state
+/// (e.g., current row index used inside creator lambdas).
+/// result_handler(row_index, mapped) is called after each emplace.
+template <typename HashMethodType, typename State, typename F, typename FF,
typename PreHandler,
+ typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t
num_rows, F&& creator,
+ FF&& creator_for_null_key, PreHandler&& pre_handler,
+ ResultHandler&& result_handler) {
+ if constexpr (HashMethodType::is_string_hash_map()) {
+ using HashMap = typename HashMethodType::HashMapType;
+ constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+ auto& hash_table = *agg_method.hash_table;
+ auto& sht = get_string_hash_table(hash_table);
+ const auto& groups = agg_method.get_sub_table_groups();
+
+ sht.visit_submaps([&](auto group_idx, auto& submap) {
+ constexpr int G = decltype(group_idx)::value;
+ const auto& indices = groups.group_row_indices[G];
+ if (!indices.empty()) {
+ process_submap_emplace<G, is_nullable>(
+ submap, indices.data(), indices.size(), agg_method,
state, hash_table,
+ creator, creator_for_null_key, pre_handler,
result_handler);
+ }
+ });
+ } else {
+ // Standard per-row loop with ahead prefetch
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ agg_method.template prefetch<false>(i);
+ pre_handler(i);
+ result_handler(i, *state.lazy_emplace_key(*agg_method.hash_table,
i, agg_method.keys[i],
+
agg_method.hash_values[i], creator,
+ creator_for_null_key));
+ }
+ }
+}
+
+/// Convenience overload without pre_handler (uses no-op).
+template <typename HashMethodType, typename State, typename F, typename FF,
typename ResultHandler>
+void lazy_emplace_batch(HashMethodType& agg_method, State& state, uint32_t
num_rows, F&& creator,
+ FF&& creator_for_null_key, ResultHandler&&
result_handler) {
+ lazy_emplace_batch(
+ agg_method, state, num_rows, std::forward<F>(creator),
+ std::forward<FF>(creator_for_null_key), [](uint32_t) {},
+ std::forward<ResultHandler>(result_handler));
+}
+
+/// Batch emplace helper (void version): like lazy_emplace_batch but ignores
the return value.
+/// pre_handler(row) is called before each emplace, allowing callers to update
captured state
+/// (e.g., the current row index used inside creator lambdas).
+template <typename HashMethodType, typename State, typename F, typename FF,
typename PreHandler>
+void lazy_emplace_batch_void(HashMethodType& agg_method, State& state,
uint32_t num_rows,
+ F&& creator, FF&& creator_for_null_key,
PreHandler&& pre_handler) {
+ if constexpr (HashMethodType::is_string_hash_map()) {
+ using HashMap = typename HashMethodType::HashMapType;
+ constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+ auto& hash_table = *agg_method.hash_table;
+ auto& sht = get_string_hash_table(hash_table);
+ const auto& groups = agg_method.get_sub_table_groups();
+
+ sht.visit_submaps([&](auto group_idx, auto& submap) {
+ constexpr int G = decltype(group_idx)::value;
+ const auto& indices = groups.group_row_indices[G];
+ if (!indices.empty()) {
+ process_submap_emplace_void<G, is_nullable>(submap,
indices.data(), indices.size(),
+ agg_method, state,
hash_table, creator,
+
creator_for_null_key, pre_handler);
+ }
+ });
+ } else {
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ agg_method.template prefetch<false>(i);
+ pre_handler(i);
+ state.lazy_emplace_key(*agg_method.hash_table, i,
agg_method.keys[i],
+ agg_method.hash_values[i], creator,
creator_for_null_key);
+ }
+ }
+}
+
+/// Batch find helper: for StringHashMap, directly accesses sub-tables
bypassing dispatch();
+/// for other hash maps, does per-row loop with standard prefetch.
+template <typename HashMethodType, typename State, typename ResultHandler>
+void find_batch(HashMethodType& agg_method, State& state, uint32_t num_rows,
+ ResultHandler&& result_handler) {
+ if constexpr (HashMethodType::is_string_hash_map()) {
+ using HashMap = typename HashMethodType::HashMapType;
+ constexpr bool is_nullable = IsNullableStringHashMap<HashMap>::value;
+
+ auto& hash_table = *agg_method.hash_table;
+ auto& sht = get_string_hash_table(hash_table);
+ const auto& groups = agg_method.get_sub_table_groups();
+
+ sht.visit_submaps([&](auto group_idx, auto& submap) {
+ constexpr int G = decltype(group_idx)::value;
+ const auto& indices = groups.group_row_indices[G];
+ if (!indices.empty()) {
+ process_submap_find<G, is_nullable>(submap, indices.data(),
indices.size(),
+ agg_method, state,
hash_table, result_handler);
+ }
+ });
+ } else {
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ agg_method.template prefetch<true>(i);
+ auto find_result = state.find_key_with_hash(
+ *agg_method.hash_table, i, agg_method.keys[i],
agg_method.hash_values[i]);
+ result_handler(i, find_result);
+ }
+ }
+}
+
/// For the case where there is one numeric key.
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
template <typename FieldType, typename TData>
diff --git a/be/src/vec/common/hash_table/string_hash_table.h
b/be/src/vec/common/hash_table/string_hash_table.h
index ebde230f1fd..b7f8d707b45 100644
--- a/be/src/vec/common/hash_table/string_hash_table.h
+++ b/be/src/vec/common/hash_table/string_hash_table.h
@@ -673,6 +673,28 @@ public:
const_iterator cend() const { return end(); }
iterator end() { return iterator(this, true); }
+ /// Public accessors for sub-tables, enabling direct batch operations
+ /// that bypass dispatch() for better performance (no per-row branching).
+ T0& get_submap_m0() { return m0; }
+ T1& get_submap_m1() { return m1; }
+ T2& get_submap_m2() { return m2; }
+ T3& get_submap_m3() { return m3; }
+ T4& get_submap_m4() { return m4; }
+ Ts& get_submap_ms() { return ms; }
+
+ /// Visit each (group_index, submap) pair with a generic callable.
+ /// Func signature: func(std::integral_constant<int, GroupIdx>, Submap&)
+ /// The integral_constant enables compile-time group dispatch in the
lambda.
+ template <typename Func>
+ ALWAYS_INLINE void visit_submaps(Func&& func) {
+ func(std::integral_constant<int, 0> {}, m0);
+ func(std::integral_constant<int, 1> {}, m1);
+ func(std::integral_constant<int, 2> {}, m2);
+ func(std::integral_constant<int, 3> {}, m3);
+ func(std::integral_constant<int, 4> {}, m4);
+ func(std::integral_constant<int, 5> {}, ms);
+ }
+
bool add_elem_size_overflow(size_t add_size) const {
return m1.add_elem_size_overflow(add_size) ||
m2.add_elem_size_overflow(add_size) ||
m3.add_elem_size_overflow(add_size) ||
m4.add_elem_size_overflow(add_size) ||
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f6c929b6526..61d1aefb020 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -814,6 +814,7 @@ public class Coordinator implements CoordInterface {
// TableValuedFunctionScanNode, we should ensure
TableValuedFunctionScanNode does not
// send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() > 1;
+ boolean isSingleBackend = addressToBackendID.size() == 1;
for (PlanFragment fragment : fragments) {
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());
@@ -844,6 +845,7 @@ public class Coordinator implements CoordInterface {
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId),
pipelineExecContext);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index 1c60b218777..945cf66bbfe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
public final Supplier<Set<TUniqueId>> instanceIds =
Suppliers.memoize(this::getInstanceIds);
public final Supplier<Map<TNetworkAddress, Long>> backends =
Suppliers.memoize(this::getBackends);
public final Supplier<Integer> scanRangeNum =
Suppliers.memoize(this::getScanRangeNum);
+ public final Supplier<Boolean> isSingleBackendQuery =
Suppliers.memoize(this::computeIsSingleBackendQuery);
public final Supplier<TNetworkAddress> directConnectFrontendAddress
= Suppliers.memoize(this::computeDirectConnectCoordinator);
@@ -447,6 +448,10 @@ public class CoordinatorContext {
return scanRangeNum;
}
+ private boolean computeIsSingleBackendQuery() {
+ return backends.get().size() == 1;
+ }
+
private int computeScanRangeNumByScanRange(TScanRangeParams param) {
int scanRangeNum = 0;
TScanRange scanRange = param.getScanRange();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index d6e4b64dc96..13391013f52 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -90,6 +90,11 @@ public class ThriftPlansBuilder {
List<PipelineDistributedPlan> distributedPlans =
coordinatorContext.distributedPlans;
+ // Determine whether this query is assigned to a single backend and
propagate it to
+ // TQueryOptions so that BE can apply more appropriate optimization
strategies (e.g.
+ // streaming aggregation hash table thresholds).
+
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
// we should set runtime predicate first, then we can use heap sort
and to thrift
setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index cddef80521c..3217eeacb06 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -431,6 +431,11 @@ struct TQueryOptions {
195: optional bool enable_left_semi_direct_return_opt;
+ // Whether all fragments of this query are assigned to a single backend.
+ // When true, the streaming aggregation operator can use more aggressive
+ // hash table expansion thresholds since all data is local.
+ 202: optional bool single_backend_query = false;
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]