This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 21e21f5e3b4 [opt](exec) Use PASSTHROUGH to improve the concurrency of
the ADAPTIV… (#44971)
21e21f5e3b4 is described below
commit 21e21f5e3b48fe8cbd4a2453e6cf6cbfd9b6edd5
Author: Mryange <[email protected]>
AuthorDate: Fri Dec 6 09:57:31 2024 +0800
[opt](exec) Use PASSTHROUGH to improve the concurrency of the ADAPTIV…
(#44971)
…E_PASSTHROUGH SINK. (#44925)
https://github.com/apache/doris/pull/44925
before
```
op -> local sink(1) -> local source (n)
```
now
```
op -> local passthrough(1) -> local passthrough(n) -> local sink(n) ->
local source (n)
```
profile
```
Pipeline : 1(instance_num=3):
AGGREGATION_SINK_OPERATOR (id=4 , nereids_id=255):
CROSS_JOIN_OPERATOR (id=3 , nereids_id=245):
LOCAL_EXCHANGE_OPERATOR
(ADAPTIVE_PASSTHROUGH) (id=-5):
Pipeline : 2(instance_num=3):
LOCAL_EXCHANGE_SINK_OPERATOR (ADAPTIVE_PASSTHROUGH)
(id=-5):
LOCAL_EXCHANGE_OPERATOR (PASSTHROUGH) (id=-6):
Pipeline : 3(instance_num=1):
LOCAL_EXCHANGE_SINK_OPERATOR (PASSTHROUGH) (id=-6):
OLAP_SCAN_OPERATOR (id=2. nereids_id=234.
table name = nums1(nums1)):
```
---
be/src/pipeline/pipeline.h | 8 ++++++++
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 +++++++---
2 files changed, 15 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index db93e8dfe35..8743cdfb57e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -122,6 +122,14 @@ public:
return idx == ExchangeType::HASH_SHUFFLE || idx ==
ExchangeType::BUCKET_HASH_SHUFFLE;
}
+ // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
+ // data is processed and shuffled on the sink.
+ // Compared to PASSTHROUGH, this is a relatively heavy operation.
+ static bool heavy_operations_on_the_sink(ExchangeType idx) {
+ return idx == ExchangeType::HASH_SHUFFLE || idx ==
ExchangeType::BUCKET_HASH_SHUFFLE ||
+ idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
+ }
+
bool need_to_local_exchange(const DataDistribution
target_data_distribution) const {
if (target_data_distribution.distribution_type !=
ExchangeType::BUCKET_HASH_SHUFFLE &&
target_data_distribution.distribution_type !=
ExchangeType::HASH_SHUFFLE) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 860ba31097a..8c2075497a9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -945,7 +945,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
break;
case ExchangeType::ADAPTIVE_PASSTHROUGH:
shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
- cur_pipe->num_tasks(), _num_instances,
+ std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
?
_runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
@@ -1047,9 +1047,13 @@ Status PipelineXFragmentContext::_add_local_exchange(
<< " cur_pipe->operator_xs().size(): " <<
cur_pipe->operator_xs().size()
<< " new_pip->operator_xs().size(): " <<
new_pip->operator_xs().size();
- // Add passthrough local exchanger if necessary
+ // There are some local shuffles with relatively heavy operations on the
sink.
+ // If the local sink concurrency is 1 and the local source concurrency is
n, the sink becomes a bottleneck.
+ // Therefore, local passthrough is used to increase the concurrency of the
sink.
+ // op -> local sink(1) -> local source (n)
+ // op -> local passthrough(1) -> local passthrough(n) -> local sink(n) ->
local source (n)
if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
- Pipeline::is_hash_exchange(data_distribution.distribution_type)) {
+
Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
RETURN_IF_ERROR(_add_local_exchange_impl(
new_pip->operator_xs().size(), pool, new_pip,
add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH),
do_local_exchange, num_buckets,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]