This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch ckb2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c67aa4f3ca029d3597d610b5c68bda7cdca10d03 Author: BiteTheDDDDt <[email protected]> AuthorDate: Thu Mar 5 16:34:33 2026 +0800 adjust single streaming threshold update update --- .../distinct_streaming_aggregation_operator.cpp | 128 ++++++++++++--------- .../distinct_streaming_aggregation_operator.h | 2 + .../operator/streaming_aggregation_operator.cpp | 126 +++++++++++--------- .../exec/operator/streaming_aggregation_operator.h | 1 + be/src/runtime/fragment_mgr.cpp | 5 + be/src/runtime/query_context.h | 8 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 2 + .../org/apache/doris/qe/CoordinatorContext.java | 5 + .../doris/qe/runtime/ThriftPlansBuilder.java | 5 + gensrc/thrift/PaloInternalService.thrift | 5 + 10 files changed, 174 insertions(+), 113 deletions(-) diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp index 48a43ca4c39..d79388c2118 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp @@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0}, }; +static constexpr StreamingHtMinReductionEntry SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = { + // Expand up to L2 cache always. + {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0}, + // Expand into L3 cache if we look like we're getting some reduction. + // At present, The L2 cache is generally 1024k or more + {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0}, + // Expand into main memory if we're getting a significant reduction. + // The L3 cache is generally 16MB or more + {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0}, +}; + static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); @@ -62,7 +73,8 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta batch_size(state->batch_size()), _agg_data(std::make_unique<DistinctDataVariants>()), _child_block(Block::create_unique()), - _aggregated_block(Block::create_unique()) {} + _aggregated_block(Block::create_unique()), + _is_single_backend(state->get_query_ctx()->is_single_backend_query()) {} Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -98,62 +110,64 @@ bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() { } return std::visit( - Overload { - [&](std::monostate& arg) -> bool { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - return false; - }, - [&](auto& agg_method) -> bool { - auto& hash_tbl = *agg_method.hash_table; - auto [ht_mem, ht_rows] = - std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; - - // Need some rows in tables to have valid statistics. - if (ht_rows == 0) { - return true; - } - - // Find the appropriate reduction factor in our table for the current hash table sizes. - int cache_level = 0; - while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && - ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { - ++cache_level; - } - - // Compare the number of rows in the hash table with the number of input rows that - // were aggregated into it. Exclude passed through rows from this calculation since - // they were not in hash tables. - const int64_t input_rows = _input_num_rows; - const int64_t aggregated_input_rows = input_rows - _num_rows_returned; - // TODO chenhao - // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; - double current_reduction = static_cast<double>(aggregated_input_rows) / - static_cast<double>(ht_rows); - - // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be - // inaccurate, which could lead to a divide by zero below. - if (aggregated_input_rows <= 0) { - return true; - } - - // Extrapolate the current reduction factor (r) using the formula - // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data - // set, N is the number of input rows, excluding passed-through rows, and n is the - // number of rows inserted or merged into the hash tables. This is a very rough - // approximation but is good enough to be useful. - // TODO: consider collecting more statistics to better estimate reduction. - // double estimated_reduction = aggregated_input_rows >= expected_input_rows - // ? current_reduction - // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); - double min_reduction = - STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; - - // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); - // COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); - // return estimated_reduction > min_reduction; - _should_expand_hash_table = current_reduction > min_reduction; - return _should_expand_hash_table; - }}, + Overload {[&](std::monostate& arg) -> bool { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + return false; + }, + [&](auto& agg_method) -> bool { + auto& hash_tbl = *agg_method.hash_table; + auto [ht_mem, ht_rows] = + std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; + + // Need some rows in tables to have valid statistics. + if (ht_rows == 0) { + return true; + } + + const auto* reduction = _is_single_backend + ? SINGLE_BE_STREAMING_HT_MIN_REDUCTION + : STREAMING_HT_MIN_REDUCTION; + + // Find the appropriate reduction factor in our table for the current hash table sizes. + int cache_level = 0; + while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && + ht_mem >= reduction[cache_level + 1].min_ht_mem) { + ++cache_level; + } + + // Compare the number of rows in the hash table with the number of input rows that + // were aggregated into it. Exclude passed through rows from this calculation since + // they were not in hash tables. + const int64_t input_rows = _input_num_rows; + const int64_t aggregated_input_rows = input_rows - _num_rows_returned; + // TODO chenhao + // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; + double current_reduction = static_cast<double>(aggregated_input_rows) / + static_cast<double>(ht_rows); + + // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be + // inaccurate, which could lead to a divide by zero below. + if (aggregated_input_rows <= 0) { + return true; + } + + // Extrapolate the current reduction factor (r) using the formula + // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data + // set, N is the number of input rows, excluding passed-through rows, and n is the + // number of rows inserted or merged into the hash tables. This is a very rough + // approximation but is good enough to be useful. + // TODO: consider collecting more statistics to better estimate reduction. + // double estimated_reduction = aggregated_input_rows >= expected_input_rows + // ? current_reduction + // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); + double min_reduction = reduction[cache_level].streaming_ht_min_reduction; + + // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); + // COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); + // return estimated_reduction > min_reduction; + _should_expand_hash_table = current_reduction > min_reduction; + return _should_expand_hash_table; + }}, _agg_data->method_variant); } diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h b/be/src/exec/operator/distinct_streaming_aggregation_operator.h index 392da56e89b..07e015af86e 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h @@ -84,6 +84,8 @@ private: RuntimeProfile::Counter* _hash_table_input_counter = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; + + bool _is_single_backend = false; }; class DistinctStreamingAggOperatorX final diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp b/be/src/exec/operator/streaming_aggregation_operator.cpp index 96d416aeb7e..4458415b1a5 100644 --- a/be/src/exec/operator/streaming_aggregation_operator.cpp +++ b/be/src/exec/operator/streaming_aggregation_operator.cpp @@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry { // of the machine that we're running on. static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { // Expand up to L2 cache always. - {0, 0.0}, + {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0}, // Expand into L3 cache if we look like we're getting some reduction. // At present, The L2 cache is generally 1024k or more - {1024 * 1024, 1.1}, + {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1}, // Expand into main memory if we're getting a significant reduction. // The L3 cache is generally 16MB or more - {16 * 1024 * 1024, 2.0}, + {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0}, +}; + +static constexpr StreamingHtMinReductionEntry SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = { + // Expand up to L2 cache always. + {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0}, + // Expand into L3 cache if we look like we're getting some reduction. + // At present, The L2 cache is generally 1024k or more + {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0}, + // Expand into main memory if we're getting a significant reduction. + // The L3 cache is generally 16MB or more + {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0}, }; static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = @@ -81,6 +92,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), _agg_data(std::make_unique<AggregatedDataVariants>()), + _is_single_backend(state->get_query_ctx()->is_single_backend_query()), _child_block(Block::create_unique()), _pre_aggregated_block(Block::create_unique()) {} @@ -220,62 +232,64 @@ bool StreamingAggLocalState::_should_expand_preagg_hash_tables() { } return std::visit( - Overload { - [&](std::monostate& arg) -> bool { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - return false; - }, - [&](auto& agg_method) -> bool { - auto& hash_tbl = *agg_method.hash_table; - auto [ht_mem, ht_rows] = - std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; - - // Need some rows in tables to have valid statistics. - if (ht_rows == 0) { - return true; - } + Overload {[&](std::monostate& arg) -> bool { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + return false; + }, + [&](auto& agg_method) -> bool { + auto& hash_tbl = *agg_method.hash_table; + auto [ht_mem, ht_rows] = + std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; - // Find the appropriate reduction factor in our table for the current hash table sizes. - int cache_level = 0; - while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && - ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { - ++cache_level; - } + // Need some rows in tables to have valid statistics. + if (ht_rows == 0) { + return true; + } - // Compare the number of rows in the hash table with the number of input rows that - // were aggregated into it. Exclude passed through rows from this calculation since - // they were not in hash tables. - const int64_t input_rows = _input_num_rows; - const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned; - // TODO chenhao - // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; - double current_reduction = static_cast<double>(aggregated_input_rows) / - static_cast<double>(ht_rows); - - // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be - // inaccurate, which could lead to a divide by zero below. - if (aggregated_input_rows <= 0) { - return true; - } + const auto* reduction = _is_single_backend + ? SINGLE_BE_STREAMING_HT_MIN_REDUCTION + : STREAMING_HT_MIN_REDUCTION; - // Extrapolate the current reduction factor (r) using the formula - // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data - // set, N is the number of input rows, excluding passed-through rows, and n is the - // number of rows inserted or merged into the hash tables. This is a very rough - // approximation but is good enough to be useful. - // TODO: consider collecting more statistics to better estimate reduction. - // double estimated_reduction = aggregated_input_rows >= expected_input_rows - // ? current_reduction - // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); - double min_reduction = - STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; - - // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); - // COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); - // return estimated_reduction > min_reduction; - _should_expand_hash_table = current_reduction > min_reduction; - return _should_expand_hash_table; - }}, + // Find the appropriate reduction factor in our table for the current hash table sizes. + int cache_level = 0; + while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && + ht_mem >= reduction[cache_level + 1].min_ht_mem) { + ++cache_level; + } + + // Compare the number of rows in the hash table with the number of input rows that + // were aggregated into it. Exclude passed through rows from this calculation since + // they were not in hash tables. + const int64_t input_rows = _input_num_rows; + const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned; + // TODO chenhao + // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; + double current_reduction = static_cast<double>(aggregated_input_rows) / + static_cast<double>(ht_rows); + + // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be + // inaccurate, which could lead to a divide by zero below. + if (aggregated_input_rows <= 0) { + return true; + } + + // Extrapolate the current reduction factor (r) using the formula + // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data + // set, N is the number of input rows, excluding passed-through rows, and n is the + // number of rows inserted or merged into the hash tables. This is a very rough + // approximation but is good enough to be useful. + // TODO: consider collecting more statistics to better estimate reduction. + // double estimated_reduction = aggregated_input_rows >= expected_input_rows + // ? current_reduction + // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); + double min_reduction = reduction[cache_level].streaming_ht_min_reduction; + + // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); + // COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); + // return estimated_reduction > min_reduction; + _should_expand_hash_table = current_reduction > min_reduction; + return _should_expand_hash_table; + }}, _agg_data->method_variant); } diff --git a/be/src/exec/operator/streaming_aggregation_operator.h b/be/src/exec/operator/streaming_aggregation_operator.h index f72d6dce24a..abc905cbfbd 100644 --- a/be/src/exec/operator/streaming_aggregation_operator.h +++ b/be/src/exec/operator/streaming_aggregation_operator.h @@ -110,6 +110,7 @@ private: std::vector<uint8_t> cmp_res; std::vector<int> order_directions; std::vector<int> null_directions; + bool _is_single_backend = false; struct HeapLimitCursor { HeapLimitCursor(int row_id, MutableColumns& limit_columns, diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b8298174f3e..10c4a7a96b6 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -877,6 +877,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<QueryContext> query_ctx; RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, query_ctx)); SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx()); + // Set single_backend_query before prepare() so that pipeline local states + // (e.g. StreamingAggLocalState) can read the correct value in their constructors. + query_ctx->set_single_backend_query(params.__isset.query_options && + params.query_options.__isset.single_backend_query && + params.query_options.single_backend_query); int64_t duration_ns = 0; std::shared_ptr<PipelineFragmentContext> context = std::make_shared<PipelineFragmentContext>( query_ctx->query_id(), params, query_ctx, _exec_env, cb, diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1326d0bdfbe..aa1746ed8b0 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -107,6 +107,12 @@ public: return _query_watcher.elapsed_time_seconds(now) > _timeout_second; } + bool is_single_backend_query() const { return _is_single_backend_query; } + + void set_single_backend_query(bool is_single_backend_query) { + _is_single_backend_query = is_single_backend_query; + } + int64_t get_remaining_query_time_seconds() const { timespec now; clock_gettime(CLOCK_MONOTONIC, &now); @@ -381,6 +387,8 @@ private: std::string _load_error_url; std::string _first_error_msg; + bool _is_single_backend_query = false; + // file cache context holders std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> _query_context_holders; // instance id + node id -> cte scan diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d0f002a3f15..f0705f8e195 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -835,6 +835,7 @@ public class Coordinator implements CoordInterface { // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not // send data until ExchangeNode is ready to receive. boolean twoPhaseExecution = fragments.size() > 1; + boolean isSingleBackend = addressToBackendID.size() == 1; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); @@ -871,6 +872,7 @@ public class Coordinator implements CoordInterface { entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address)); entry.getValue().setBackendId(pipelineExecContext.backend.getId()); entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution); + entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend); entry.getValue().setFragmentId(fragment.getFragmentId().asInt()); pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), pipelineExecContext); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java index f3c825cf9d8..f1a124f487c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java @@ -103,6 +103,7 @@ public class CoordinatorContext { public final Supplier<Set<TUniqueId>> instanceIds = Suppliers.memoize(this::getInstanceIds); public final Supplier<Map<TNetworkAddress, Long>> backends = Suppliers.memoize(this::getBackends); public final Supplier<Integer> scanRangeNum = Suppliers.memoize(this::getScanRangeNum); + public final Supplier<Boolean> isSingleBackendQuery = Suppliers.memoize(this::computeIsSingleBackendQuery); public final Supplier<TNetworkAddress> directConnectFrontendAddress = Suppliers.memoize(this::computeDirectConnectCoordinator); @@ -447,6 +448,10 @@ public class CoordinatorContext { return scanRangeNum; } + private boolean computeIsSingleBackendQuery() { + return backends.get().size() == 1; + } + private int computeScanRangeNumByScanRange(TScanRangeParams param) { int scanRangeNum = 0; TScanRange scanRange = param.getScanRange(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 6ffa8de61a4..0738876282f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -106,6 +106,11 @@ public class ThriftPlansBuilder { Set<Integer> fragmentToNotifyClose = setParamsForRecursiveCteNode(distributedPlans, coordinatorContext.runtimeFilters); + // Determine whether this query is assigned to a single backend and propagate it to + // TQueryOptions so that BE can apply more appropriate optimization strategies (e.g. + // streaming aggregation hash table thresholds). + coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get()); + // we should set runtime predicate first, then we can use heap sort and to thrift setRuntimePredicateIfNeed(coordinatorContext.scanNodes); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index e495d529ef6..495c1477647 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -440,6 +440,11 @@ struct TQueryOptions { // Use paimon-cpp to read Paimon splits on BE 201: optional bool enable_paimon_cpp_reader = false; + // Whether all fragments of this query are assigned to a single backend. + // When true, the streaming aggregation operator can use more aggressive + // hash table expansion thresholds since all data is local. + 202: optional bool single_backend_query = false; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
