This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f0f4caebd678ee8d6efedd6b1a70a4b76ac1c167
Author: Mryange <[email protected]>
AuthorDate: Fri Mar 13 13:04:49 2026 +0800

    [pick](code) pick "adjust some local shuffle rules" and 
"[conf](passthrough) add ForcePassthrough SessionVariable" (#61275)
    
    ### What problem does this PR solve?
    
    https://github.com/apache/doris/pull/59366
    https://github.com/apache/doris/pull/60671
    
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Co-authored-by: Pxl <[email protected]>
---
 .../exec/distinct_streaming_aggregation_operator.h |  7 ++++++-
 be/src/pipeline/exec/hashjoin_probe_operator.h     | 24 ++++++++++++++--------
 be/src/pipeline/exec/operator.h                    |  2 ++
 .../pipeline/exec/streaming_aggregation_operator.h |  4 ++++
 be/src/runtime/runtime_state.h                     | 15 ++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 20 ++++++++++++++++++
 gensrc/thrift/PaloInternalService.thrift           |  6 ++++++
 7 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 5a084823756..d4f7a08136b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -125,7 +125,12 @@ public:
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
-        return 
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(state);
+        if (state->enable_distinct_streaming_agg_force_passthrough()) {
+            return {ExchangeType::PASSTHROUGH};
+        } else {
+            return 
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(
+                    state);
+        }
     }
 
     bool is_colocated_operator() const override { return _is_colocate; }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index c7f4b7d0c59..6be1df67fe6 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -139,20 +139,28 @@ public:
                 bool* eos) const override;
 
     bool need_more_input_data(RuntimeState* state) const override;
-    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
+        } else if (_is_broadcast_join) {
+            if (state->enable_broadcast_join_force_passthrough()) {
+                return DataDistribution(ExchangeType::PASSTHROUGH);
+            } else {
+                return _child && _child->is_serial_operator()
+                               ? DataDistribution(ExchangeType::PASSTHROUGH)
+                               : DataDistribution(ExchangeType::NOOP);
+            }
         }
-        return _is_broadcast_join
-                       ? DataDistribution(ExchangeType::PASSTHROUGH)
-                       : (_join_distribution == 
TJoinDistributionType::BUCKET_SHUFFLE ||
-                                          _join_distribution == 
TJoinDistributionType::COLOCATE
-                                  ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
-                                                     _partition_exprs)
-                                  : 
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
+
+        return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+                                _join_distribution == 
TJoinDistributionType::COLOCATE
+                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
+                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs));
     }
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
+    bool is_hash_join_probe() const override { return true; }
+
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED ||
                _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 875d9ce18ef..a003ac1316d 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -137,6 +137,8 @@ public:
         return Status::OK();
     }
 
+    virtual bool is_hash_join_probe() const { return false; }
+
     /**
      * Pipeline task is blockable means it will be blocked in the next run. So 
we should put the
      * pipeline task into the blocking task scheduler.
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 513cfec5b60..d5b09c7eb25 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -152,6 +152,10 @@ public:
         _spill_streaming_agg_mem_limit = 1024 * 1024;
     }
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        if (_child && _child->is_hash_join_probe() &&
+            state->enable_streaming_agg_hash_join_force_passthrough()) {
+            return DataDistribution(ExchangeType::PASSTHROUGH);
+        }
         if (!state->get_query_ctx()->should_be_shuffled_agg(
                     StatefulOperatorX<StreamingAggLocalState>::node_id())) {
             return 
StatefulOperatorX<StreamingAggLocalState>::required_data_distribution(state);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 5860c6744d1..255baf931a2 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -535,6 +535,21 @@ public:
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
 
+    bool enable_streaming_agg_hash_join_force_passthrough() const {
+        return 
_query_options.__isset.enable_streaming_agg_hash_join_force_passthrough &&
+               _query_options.enable_streaming_agg_hash_join_force_passthrough;
+    }
+
+    bool enable_distinct_streaming_agg_force_passthrough() const {
+        return 
_query_options.__isset.enable_distinct_streaming_agg_force_passthrough &&
+               _query_options.enable_distinct_streaming_agg_force_passthrough;
+    }
+
+    bool enable_broadcast_join_force_passthrough() const {
+        return _query_options.__isset.enable_broadcast_join_force_passthrough 
&&
+               _query_options.enable_broadcast_join_force_passthrough;
+    }
+
     int rpc_verbose_profile_max_instance_count() const {
         return _query_options.__isset.rpc_verbose_profile_max_instance_count
                        ? _query_options.rpc_verbose_profile_max_instance_count
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6b831e23c45..3c4f92c332b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -163,6 +163,11 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String BROKER_LOAD_BATCH_SIZE = 
"broker_load_batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = 
"disable_streaming_preaggregations";
     public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = 
"enable_distinct_streaming_aggregation";
+    public static final String 
ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH =
+            "enable_streaming_agg_hash_join_force_passthrough";
+    public static final String ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH 
=
+            "enable_distinct_streaming_agg_force_passthrough";
+    public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = 
"enable_broadcast_join_force_passthrough";
     public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
     public static final String ENABLE_BUCKET_SHUFFLE_JOIN = 
"enable_bucket_shuffle_join";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = 
"parallel_fragment_exec_instance_num";
@@ -1231,6 +1236,15 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGGREGATION, fuzzy = 
true)
     public boolean enableDistinctStreamingAggregation = true;
 
+    @VariableMgr.VarAttr(name = 
ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH, fuzzy = true)
+    public boolean enableStreamingAggHashJoinForcePassthrough = true;
+
+    @VariableMgr.VarAttr(name = 
ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH, fuzzy = true)
+    public boolean enableDistinctStreamingAggForcePassthrough = true;
+
+    @VariableMgr.VarAttr(name = ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH, fuzzy 
= true)
+    public boolean enableBroadcastJoinForcePassthrough = false;
+
     @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN)
     public boolean disableColocatePlan = false;
 
@@ -3303,6 +3317,9 @@ public class SessionVariable implements Serializable, 
Writable {
         this.useSerialExchange = random.nextBoolean();
         this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
         this.disableStreamPreaggregations = random.nextBoolean();
+        this.enableStreamingAggHashJoinForcePassthrough = random.nextBoolean();
+        this.enableDistinctStreamingAggForcePassthrough = random.nextBoolean();
+        this.enableBroadcastJoinForcePassthrough = random.nextBoolean();
         this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
         this.shortCircuitEvaluation = random.nextBoolean();
 
@@ -4960,6 +4977,9 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setBatchSize(batchSize);
         tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
         
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
+        
tResult.setEnableStreamingAggHashJoinForcePassthrough(enableStreamingAggHashJoinForcePassthrough);
+        
tResult.setEnableDistinctStreamingAggForcePassthrough(enableDistinctStreamingAggForcePassthrough);
+        
tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough);
         tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
         
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 56cc2822e84..cddef80521c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -423,6 +423,12 @@ struct TQueryOptions {
 
   179: optional bool enable_parquet_filter_by_bloom_filter = true;
 
+  186: optional bool enable_streaming_agg_hash_join_force_passthrough;
+
+  187: optional bool enable_distinct_streaming_agg_force_passthrough;
+
+  188: optional bool enable_broadcast_join_force_passthrough;
+
   195: optional bool enable_left_semi_direct_return_opt;
 
   // For cloud, to control if the content would be written into file cache


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to