This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 24fd23bb95f [pick](code) pick "adjust some local shuffle rules" and
"[conf](passthrough) add ForcePassthrough SessionVariable" (#61275)
24fd23bb95f is described below
commit 24fd23bb95f4ca51190a2c314f83b71693a7b0b2
Author: Mryange <[email protected]>
AuthorDate: Fri Mar 13 13:04:49 2026 +0800
[pick](code) pick "adjust some local shuffle rules" and
"[conf](passthrough) add ForcePassthrough SessionVariable" (#61275)
### What problem does this PR solve?
https://github.com/apache/doris/pull/59366
https://github.com/apache/doris/pull/60671
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Co-authored-by: Pxl <[email protected]>
---
.../exec/distinct_streaming_aggregation_operator.h | 7 ++++++-
be/src/pipeline/exec/hashjoin_probe_operator.h | 24 ++++++++++++++--------
be/src/pipeline/exec/operator.h | 2 ++
.../pipeline/exec/streaming_aggregation_operator.h | 4 ++++
be/src/runtime/runtime_state.h | 15 ++++++++++++++
.../java/org/apache/doris/qe/SessionVariable.java | 20 ++++++++++++++++++
gensrc/thrift/PaloInternalService.thrift | 6 ++++++
7 files changed, 69 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 5a084823756..d4f7a08136b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -125,7 +125,12 @@ public:
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- return
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(state);
+ if (state->enable_distinct_streaming_agg_force_passthrough()) {
+ return {ExchangeType::PASSTHROUGH};
+ } else {
+ return
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(
+ state);
+ }
}
bool is_colocated_operator() const override { return _is_colocate; }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 6c5232b3720..ceabc6cb8b6 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -130,20 +130,28 @@ public:
bool* eos) const override;
bool need_more_input_data(RuntimeState* state) const override;
- DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
+ DataDistribution required_data_distribution(RuntimeState* state) const
override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
+ } else if (_is_broadcast_join) {
+ if (state->enable_broadcast_join_force_passthrough()) {
+ return DataDistribution(ExchangeType::PASSTHROUGH);
+ } else {
+ return _child && _child->is_serial_operator()
+ ? DataDistribution(ExchangeType::PASSTHROUGH)
+ : DataDistribution(ExchangeType::NOOP);
+ }
}
- return _is_broadcast_join
- ? DataDistribution(ExchangeType::PASSTHROUGH)
- : (_join_distribution ==
TJoinDistributionType::BUCKET_SHUFFLE ||
- _join_distribution ==
TJoinDistributionType::COLOCATE
- ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
- _partition_exprs)
- :
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
+
+ return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+ _join_distribution ==
TJoinDistributionType::COLOCATE
+ ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
+ : DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs));
}
bool is_broadcast_join() const { return _is_broadcast_join; }
+ bool is_hash_join_probe() const override { return true; }
+
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 875d9ce18ef..a003ac1316d 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -137,6 +137,8 @@ public:
return Status::OK();
}
+ virtual bool is_hash_join_probe() const { return false; }
+
/**
* Pipeline task is blockable means it will be blocked in the next run. So
we should put the
* pipeline task into the blocking task scheduler.
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 513cfec5b60..d5b09c7eb25 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -152,6 +152,10 @@ public:
_spill_streaming_agg_mem_limit = 1024 * 1024;
}
DataDistribution required_data_distribution(RuntimeState* state) const
override {
+ if (_child && _child->is_hash_join_probe() &&
+ state->enable_streaming_agg_hash_join_force_passthrough()) {
+ return DataDistribution(ExchangeType::PASSTHROUGH);
+ }
if (!state->get_query_ctx()->should_be_shuffled_agg(
StatefulOperatorX<StreamingAggLocalState>::node_id())) {
return
StatefulOperatorX<StreamingAggLocalState>::required_data_distribution(state);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 5860c6744d1..255baf931a2 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -535,6 +535,21 @@ public:
return _query_options.__isset.enable_profile &&
_query_options.enable_profile;
}
+ bool enable_streaming_agg_hash_join_force_passthrough() const {
+ return
_query_options.__isset.enable_streaming_agg_hash_join_force_passthrough &&
+ _query_options.enable_streaming_agg_hash_join_force_passthrough;
+ }
+
+ bool enable_distinct_streaming_agg_force_passthrough() const {
+ return
_query_options.__isset.enable_distinct_streaming_agg_force_passthrough &&
+ _query_options.enable_distinct_streaming_agg_force_passthrough;
+ }
+
+ bool enable_broadcast_join_force_passthrough() const {
+ return _query_options.__isset.enable_broadcast_join_force_passthrough
&&
+ _query_options.enable_broadcast_join_force_passthrough;
+ }
+
int rpc_verbose_profile_max_instance_count() const {
return _query_options.__isset.rpc_verbose_profile_max_instance_count
? _query_options.rpc_verbose_profile_max_instance_count
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 21c10e06ff3..c576d6758fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -163,6 +163,11 @@ public class SessionVariable implements Serializable,
Writable {
public static final String BROKER_LOAD_BATCH_SIZE =
"broker_load_batch_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS =
"disable_streaming_preaggregations";
public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION =
"enable_distinct_streaming_aggregation";
+ public static final String
ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH =
+ "enable_streaming_agg_hash_join_force_passthrough";
+ public static final String ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH
=
+ "enable_distinct_streaming_agg_force_passthrough";
+ public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH =
"enable_broadcast_join_force_passthrough";
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
public static final String ENABLE_BUCKET_SHUFFLE_JOIN =
"enable_bucket_shuffle_join";
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM =
"parallel_fragment_exec_instance_num";
@@ -1228,6 +1233,15 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGGREGATION, fuzzy =
true)
public boolean enableDistinctStreamingAggregation = true;
+ @VariableMgr.VarAttr(name =
ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH, fuzzy = true)
+ public boolean enableStreamingAggHashJoinForcePassthrough = true;
+
+ @VariableMgr.VarAttr(name =
ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH, fuzzy = true)
+ public boolean enableDistinctStreamingAggForcePassthrough = true;
+
+ @VariableMgr.VarAttr(name = ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH, fuzzy
= true)
+ public boolean enableBroadcastJoinForcePassthrough = false;
+
@VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN)
public boolean disableColocatePlan = false;
@@ -3294,6 +3308,9 @@ public class SessionVariable implements Serializable,
Writable {
this.useSerialExchange = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
this.disableStreamPreaggregations = random.nextBoolean();
+ this.enableStreamingAggHashJoinForcePassthrough = random.nextBoolean();
+ this.enableDistinctStreamingAggForcePassthrough = random.nextBoolean();
+ this.enableBroadcastJoinForcePassthrough = random.nextBoolean();
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
this.shortCircuitEvaluation = random.nextBoolean();
@@ -4951,6 +4968,9 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setBatchSize(batchSize);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
+
tResult.setEnableStreamingAggHashJoinForcePassthrough(enableStreamingAggHashJoinForcePassthrough);
+
tResult.setEnableDistinctStreamingAggForcePassthrough(enableDistinctStreamingAggForcePassthrough);
+
tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough);
tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 56cc2822e84..cddef80521c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -423,6 +423,12 @@ struct TQueryOptions {
179: optional bool enable_parquet_filter_by_bloom_filter = true;
+ 186: optional bool enable_streaming_agg_hash_join_force_passthrough;
+
+ 187: optional bool enable_distinct_streaming_agg_force_passthrough;
+
+ 188: optional bool enable_broadcast_join_force_passthrough;
+
195: optional bool enable_left_semi_direct_return_opt;
// For cloud, to control if the content would be written into file cache
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]