This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f0f4caebd678ee8d6efedd6b1a70a4b76ac1c167 Author: Mryange <[email protected]> AuthorDate: Fri Mar 13 13:04:49 2026 +0800 [pick](code) pick "adjust some local shuffle rules" and "[conf](passthrough) add ForcePassthrough SessionVariable" (#61275) ### What problem does this PR solve? https://github.com/apache/doris/pull/59366 https://github.com/apache/doris/pull/60671 Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --------- Co-authored-by: Pxl <[email protected]> --- .../exec/distinct_streaming_aggregation_operator.h | 7 ++++++- be/src/pipeline/exec/hashjoin_probe_operator.h | 24 ++++++++++++++-------- be/src/pipeline/exec/operator.h | 2 ++ .../pipeline/exec/streaming_aggregation_operator.h | 4 ++++ be/src/runtime/runtime_state.h | 15 ++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 20 ++++++++++++++++++ gensrc/thrift/PaloInternalService.thrift | 6 ++++++ 7 files changed, 69 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 5a084823756..d4f7a08136b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -125,7 +125,12 @@ public: ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(state); + if (state->enable_distinct_streaming_agg_force_passthrough()) { + return {ExchangeType::PASSTHROUGH}; + } else { + return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution( + state); + } } bool is_colocated_operator() const override { return _is_colocate; } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index c7f4b7d0c59..6be1df67fe6 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -139,20 +139,28 @@ public: bool* eos) const override; bool need_more_input_data(RuntimeState* state) const override; - DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + DataDistribution required_data_distribution(RuntimeState* state) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; + } else if (_is_broadcast_join) { + if (state->enable_broadcast_join_force_passthrough()) { + return DataDistribution(ExchangeType::PASSTHROUGH); + } else { + return _child && _child->is_serial_operator() + ? DataDistribution(ExchangeType::PASSTHROUGH) + : DataDistribution(ExchangeType::NOOP); + } } - return _is_broadcast_join - ? DataDistribution(ExchangeType::PASSTHROUGH) - : (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || - _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, - _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); + + return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || + _join_distribution == TJoinDistributionType::COLOCATE + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); } bool is_broadcast_join() const { return _is_broadcast_join; } + bool is_hash_join_probe() const override { return true; } + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED || _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 875d9ce18ef..a003ac1316d 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -137,6 +137,8 @@ public: return Status::OK(); } + virtual bool is_hash_join_probe() const { return false; } + /** * Pipeline task is blockable means it will be blocked in the next run. So we should put the * pipeline task into the blocking task scheduler. diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 513cfec5b60..d5b09c7eb25 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -152,6 +152,10 @@ public: _spill_streaming_agg_mem_limit = 1024 * 1024; } DataDistribution required_data_distribution(RuntimeState* state) const override { + if (_child && _child->is_hash_join_probe() && + state->enable_streaming_agg_hash_join_force_passthrough()) { + return DataDistribution(ExchangeType::PASSTHROUGH); + } if (!state->get_query_ctx()->should_be_shuffled_agg( StatefulOperatorX<StreamingAggLocalState>::node_id())) { return StatefulOperatorX<StreamingAggLocalState>::required_data_distribution(state); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 5860c6744d1..255baf931a2 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -535,6 +535,21 @@ public: return _query_options.__isset.enable_profile && _query_options.enable_profile; } + bool enable_streaming_agg_hash_join_force_passthrough() const { + return _query_options.__isset.enable_streaming_agg_hash_join_force_passthrough && + _query_options.enable_streaming_agg_hash_join_force_passthrough; + } + + bool enable_distinct_streaming_agg_force_passthrough() const { + return _query_options.__isset.enable_distinct_streaming_agg_force_passthrough && + _query_options.enable_distinct_streaming_agg_force_passthrough; + } + + bool enable_broadcast_join_force_passthrough() const { + return _query_options.__isset.enable_broadcast_join_force_passthrough && + _query_options.enable_broadcast_join_force_passthrough; + } + int rpc_verbose_profile_max_instance_count() const { return _query_options.__isset.rpc_verbose_profile_max_instance_count ? _query_options.rpc_verbose_profile_max_instance_count diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 6b831e23c45..3c4f92c332b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -163,6 +163,11 @@ public class SessionVariable implements Serializable, Writable { public static final String BROKER_LOAD_BATCH_SIZE = "broker_load_batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = "enable_distinct_streaming_aggregation"; + public static final String ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH = + "enable_streaming_agg_hash_join_force_passthrough"; + public static final String ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH = + "enable_distinct_streaming_agg_force_passthrough"; + public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = "enable_broadcast_join_force_passthrough"; public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan"; public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; @@ -1231,6 +1236,15 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGGREGATION, fuzzy = true) public boolean enableDistinctStreamingAggregation = true; + @VariableMgr.VarAttr(name = ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH, fuzzy = true) + public boolean enableStreamingAggHashJoinForcePassthrough = true; + + @VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH, fuzzy = true) + public boolean enableDistinctStreamingAggForcePassthrough = true; + + @VariableMgr.VarAttr(name = ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH, fuzzy = true) + public boolean enableBroadcastJoinForcePassthrough = false; + @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN) public boolean disableColocatePlan = false; @@ -3303,6 +3317,9 @@ public class SessionVariable implements Serializable, Writable { this.useSerialExchange = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); this.disableStreamPreaggregations = random.nextBoolean(); + this.enableStreamingAggHashJoinForcePassthrough = random.nextBoolean(); + this.enableDistinctStreamingAggForcePassthrough = random.nextBoolean(); + this.enableBroadcastJoinForcePassthrough = random.nextBoolean(); this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); this.shortCircuitEvaluation = random.nextBoolean(); @@ -4960,6 +4977,9 @@ public class SessionVariable implements Serializable, Writable { tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation); + tResult.setEnableStreamingAggHashJoinForcePassthrough(enableStreamingAggHashJoinForcePassthrough); + tResult.setEnableDistinctStreamingAggForcePassthrough(enableDistinctStreamingAggForcePassthrough); + tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough); tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions); tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 56cc2822e84..cddef80521c 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -423,6 +423,12 @@ struct TQueryOptions { 179: optional bool enable_parquet_filter_by_bloom_filter = true; + 186: optional bool enable_streaming_agg_hash_join_force_passthrough; + + 187: optional bool enable_distinct_streaming_agg_force_passthrough; + + 188: optional bool enable_broadcast_join_force_passthrough; + 195: optional bool enable_left_semi_direct_return_opt; // For cloud, to control if the content would be written into file cache --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
