This is an automated email from the ASF dual-hosted git repository.
924060929 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e5bbebb4905 [opt](local shuffle) bucket-shuffle dest spreading +
bucket-to-hash parallelism upgrade (#64793)
e5bbebb4905 is described below
commit e5bbebb49058a998f8e3a5a380d49508c1bc4a2c
Author: 924060929 <[email protected]>
AuthorDate: Tue Jun 30 10:33:27 2026 +0800
[opt](local shuffle) bucket-shuffle dest spreading + bucket-to-hash
parallelism upgrade (#64793)
Related PR: #63366 (FE local-shuffle planner, merged)
Problem Summary:
Two optimizations on top of the FE local-shuffle planner (#63366),
addressing bucket-shuffle serial bottleneck + parallelism upgrade.
---
#### Part 1: ExchangeNode serial decoupling + bucket-shuffle dest
spreading
When the FE planner is enabled, `ExchangeNode.isSerialOperatorOnBe` no
longer inherits `fragment.hasSerialScanNode()` — serial status comes
from the node itself only. This allows `DistributePlanner` to spread
bucket-shuffle destinations to all bucket-owning instances (via
`assignedJoinBucketIndexes`) instead of funneling to the first instance
per worker.
Includes BE-side orphan instance fix: non-owning instances get receivers
with `num_senders=0` (ready immediately at EOS), and orphan detection
uses per-BE local task index.
```sql
-- orders: DISTRIBUTED BY HASH(o_custkey) BUCKETS 4
-- customer: DISTRIBUTED BY HASH(c_custkey) BUCKETS 4
SELECT c_name, SUM(o_totalprice) total
FROM orders JOIN customer ON o_custkey = c_custkey
GROUP BY c_name ORDER BY total DESC;
```
Baseline (planner=false, BE native) — no LocalExchangeNode:
```
4:AGG(update serialize, STREAMING)
3:HASH JOIN (BUCKET_SHUFFLE)
|----1:EXCHANGE ← customer, bucket-shuffle partitioned
2:OlapScan(orders) ← pooling scan, 16 instances but only
4 have buckets
```
FE planner with dest spreading (ratio=0, no upgrade) — bucket
distribution preserved, dests spread to all 4 bucket-owning instances:
```
4:AGG(update serialize, STREAMING)
12:LOCAL-EXCHANGE(PASSTHROUGH) ← fan serial→parallel for agg
3:HASH JOIN (BUCKET_SHUFFLE)
|----1:EXCHANGE ← customer, dests spread to 4
bucket-owning instances
11:LOCAL-EXCHANGE(BUCKET_HASH) ← ★ bucket LE: join capped at bucket
count (4)
10:LOCAL-EXCHANGE(PASSTHROUGH) ← fan serial scan → parallel
2:OlapScan(orders, POOLING-SCAN)
```
---
#### Part 2: Bucket-to-hash parallelism upgrade + RF force_local_merge
When per-BE instances significantly exceed bucket count, upgrade
`BUCKET_HASH_SHUFFLE` local exchanges to `LOCAL_EXECUTION_HASH_SHUFFLE`
so the join runs at full instance parallelism. Gate: `min(instances,
executor_threads) / min(buckets, executor_threads) > ratio` (session var
`local_shuffle_bucket_upgrade_ratio`, default 1.5; <= 1 disables). With
the default `bucket_shuffle_downgrade_ratio` (0.8) a single bucket join
only forms when instances/buckets <= 1.25, so this upgrade stays off for
single joins by default and engages only for stacked / wide
bucket-domain shapes.
RF `force_local_merge` fix: bucket upgrade flips scan from serial to
parallel, breaking the implicit RF merge signal. Added
`TRuntimeFilterDesc.force_local_merge` — FE walks builder→target path
after `AddLocalExchange`; if a `LocalExchangeNode` sits on the path, the
target must merge partial RFs.
**Single bucket join** — same SQL as above, ratio=1.1:
```
4:AGG(update serialize, STREAMING)
13:LOCAL-EXCHANGE(PASSTHROUGH)
3:HASH JOIN (BUCKET_SHUFFLE)
|----12:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH) ← ★ UPGRADED: hash by join
key, all 16 instances
| 1:EXCHANGE
11:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH) ← ★ UPGRADED: was
BUCKET_HASH, now LOCAL hash
10:LOCAL-EXCHANGE(PASSTHROUGH)
2:OlapScan(orders, POOLING-SCAN)
```
**Stacked bucket joins (whole-chain upgrade)**:
```sql
-- orders BUCKETS 4, dim1 BUCKETS 4, dim2 BUCKETS 3
SELECT d1_val, d2_val, SUM(o_totalprice) total
FROM orders
JOIN dim1 ON o_custkey = d1_key
JOIN dim2 ON o_custkey = d2_key
GROUP BY d1_val, d2_val ORDER BY total DESC;
```
Without upgrade (ratio=0) — both joins capped at bucket count:
```
7:AGG(update serialize, STREAMING)
16:LOCAL-EXCHANGE(PASSTHROUGH)
6:HASH JOIN(BUCKET_SHUFFLE) ← join2, capped at 4 instances
|----1:EXCHANGE
5:HASH JOIN(BUCKET_SHUFFLE) ← join1, capped at 4 instances
|----3:EXCHANGE
15:LOCAL-EXCHANGE(BUCKET_HASH)
14:LOCAL-EXCHANGE(PASSTHROUGH)
4:OlapScan(orders, POOLING-SCAN)
```
With upgrade (ratio=1.1) — whole chain upgraded, all 16 instances join:
```
7:AGG(update serialize, STREAMING)
19:LOCAL-EXCHANGE(PASSTHROUGH)
6:HASH JOIN(BUCKET_SHUFFLE)
|----18:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH) ← ★ UPGRADED
| 1:EXCHANGE
17:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH) ← ★ upper join re-aligns
5:HASH JOIN(BUCKET_SHUFFLE)
|----16:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH) ← ★ UPGRADED
| 3:EXCHANGE
15:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH) ← ★ lower join also
upgraded
14:LOCAL-EXCHANGE(PASSTHROUGH)
4:OlapScan(orders, POOLING-SCAN)
```
The lower join upgrades and reports NOOP output, so the upper join
inserts its own LE to re-align data to its own join keys.
---
### Release note
Add session variable `local_shuffle_bucket_upgrade_ratio` (default 1.5;
values <= 1 disable it) to control the bucket-to-hash parallelism
upgrade in pooled-scan fragments: when per-BE instances exceed
buckets-with-data by this ratio, bucket-shuffle local exchanges are
upgraded to hash exchanges for higher join parallelism. With the default
`bucket_shuffle_downgrade_ratio` (0.8) the upgrade only engages for
stacked / wide bucket-domain shapes.
### Check List (For Author)
- Test
- [x] Regression test
- [x] Unit Test
- [ ] Manual test
- [ ] No need to test
- Behavior changed:
- [x] Yes. Bucket-shuffle join fragments may now use
`LOCAL_EXECUTION_HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE` local
exchanges when the upgrade ratio is met. Setting
`local_shuffle_bucket_upgrade_ratio=0` (or <= 1) restores the previous
behavior.
---
be/src/exec/exchange/vdata_stream_recvr.h | 15 +-
be/src/exec/operator/exchange_source_operator.cpp | 13 +-
be/src/exec/operator/exchange_source_operator.h | 36 ++++
be/src/exec/pipeline/pipeline_fragment_context.cpp | 10 +-
be/src/runtime/runtime_state.cpp | 3 +-
.../glue/translator/PlanTranslatorContext.java | 31 ++++
.../properties/ChildrenPropertiesRegulator.java | 10 +-
.../trees/plans/distribute/DistributePlanner.java | 57 +++++-
.../org/apache/doris/planner/AddLocalExchange.java | 105 +++++++++++
.../org/apache/doris/planner/ExchangeNode.java | 7 +
.../org/apache/doris/planner/HashJoinNode.java | 96 ++++++++--
.../org/apache/doris/planner/RuntimeFilter.java | 37 ++++
.../java/org/apache/doris/qe/SessionVariable.java | 41 +++++
.../planner/LocalShuffleNodeCoverageTest.java | 160 ++++++++++++++++-
gensrc/thrift/PlanNodes.thrift | 6 +
.../test_local_shuffle_bucket_upgrade.groovy | 193 +++++++++++++++++++++
.../test_local_shuffle_rqg_bugs.groovy | 75 ++++++--
17 files changed, 852 insertions(+), 43 deletions(-)
diff --git a/be/src/exec/exchange/vdata_stream_recvr.h
b/be/src/exec/exchange/vdata_stream_recvr.h
index 2b395ee61f4..12650e780a1 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -198,7 +198,20 @@ public:
void close();
- void set_dependency(std::shared_ptr<Dependency> dependency) {
_source_dependency = dependency; }
+ void set_dependency(std::shared_ptr<Dependency> dependency) {
+ // Assign under _lock: set_source_ready() (reached via
decrement_senders/cancel/close
+ // on other threads) reads _source_dependency while holding _lock, so
a lock-free
+ // shared_ptr assignment here would race with that read.
+ std::lock_guard<std::mutex> l(_lock);
+ _source_dependency = dependency;
+ // A queue created with zero senders (bucket-shuffle orphan instance,
see
+ // ExchangeLocalState::create_stream_recvr) never goes through
decrement_senders,
+ // so the usual reached-zero set_ready never fires — mark it ready at
wiring time
+ // or its task blocks forever on SHUFFLE_DATA_DEPENDENCY.
+ if (_num_remaining_senders == 0) {
+ set_source_ready(l);
+ }
+ }
protected:
struct BlockItem;
diff --git a/be/src/exec/operator/exchange_source_operator.cpp
b/be/src/exec/operator/exchange_source_operator.cpp
index 1e4d24de8e8..acdfe02d048 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -64,9 +64,17 @@ std::string ExchangeSourceOperatorX::debug_string(int
indentation_level) const {
void ExchangeLocalState::create_stream_recvr(RuntimeState* state) {
auto& p = _parent->cast<ExchangeSourceOperatorX>();
+ int num_senders = p.num_senders();
+ if (p.is_bucket_shuffle_orphan_instance(local_task_idx)) {
+ // Bucket-routed senders open one channel per destination entry (one
per bucket),
+ // so an instance owning no bucket never gets a channel — and never
gets EOS.
+ // Start its receiver with zero senders so it reports EOS immediately
instead of
+ // blocking forever (K-of-N destination spread).
+ num_senders = 0;
+ }
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
- state, _memory_used_counter, state->fragment_instance_id(),
p.node_id(),
- p.num_senders(), custom_profile(), p.is_merging(),
+ state, _memory_used_counter, state->fragment_instance_id(),
p.node_id(), num_senders,
+ custom_profile(), p.is_merging(),
std::max(20480, config::exchg_node_buffer_size_bytes /
(p.is_merging() ? p.num_senders() : 1)));
}
@@ -75,6 +83,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
+ local_task_idx = info.task_idx;
create_stream_recvr(state);
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
diff --git a/be/src/exec/operator/exchange_source_operator.h
b/be/src/exec/operator/exchange_source_operator.h
index c6f651aaffd..446be0559c8 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -19,6 +19,9 @@
#include <stdint.h>
+#include <map>
+#include <set>
+
#include "exec/operator/operator.h"
#include "exprs/vexpr_fwd.h"
@@ -75,6 +78,9 @@ public:
doris::VExprContextSPtrs ordering_expr_ctxs;
int64_t num_rows_skipped;
bool is_ready;
+ // per-BE local instance index (LocalStateInfo::task_idx), used for
bucket-shuffle
+ // orphan detection in create_stream_recvr — see
is_bucket_shuffle_orphan_instance.
+ int local_task_idx = 0;
std::vector<std::shared_ptr<Dependency>> deps;
@@ -110,6 +116,34 @@ public:
[[nodiscard]] int num_senders() const { return _num_senders; }
[[nodiscard]] bool is_merging() const { return _is_merging; }
+ // Instances that bucket-routed senders can address: values of the
fragment's
+ // bucket_seq_to_instance_idx map. Senders open one channel per
destination entry
+ // (one per bucket), so an instance owning no bucket never gets a channel
— and
+ // never gets EOS. Such orphan instances must start their receiver with
zero
+ // senders or they block forever (K-of-N destination spread).
+ void set_bucket_dest_instances(const std::map<int, int>&
bucket_seq_to_instance_idx) {
+ for (const auto& [bucket_seq, instance_idx] :
bucket_seq_to_instance_idx) {
+ _bucket_dest_instances.insert(instance_idx);
+ }
+ _has_bucket_dest_instances = true;
+ }
+
+ // local_task_idx is the per-BE local instance index
(LocalStateInfo::task_idx) — the
+ // same numbering as bucket_seq_to_instance_idx values (built per worker
on FE). Do NOT
+ // pass per_fragment_instance_idx here: that is sender_id = the GLOBAL
index across all
+ // workers, which only coincides with the local index on the first worker
(single-BE
+ // tests pass, multi-BE silently drops every later worker's buckets).
+ //
+ // Ownership-based orphan detection is only valid when destinations follow
bucket
+ // ownership, i.e. the non-serial (FE planner dest spread) mode. A serial
exchange's
+ // destinations funnel to the first instance per worker regardless of
bucket ownership,
+ // and BE's serial-exchange mechanics already close the other receivers.
+ [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int local_task_idx)
const {
+ return !is_serial_operator() &&
+ _partition_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+ _has_bucket_dest_instances &&
!_bucket_dest_instances.contains(local_task_idx);
+ }
+
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
return {TLocalPartitionType::NOOP};
@@ -126,6 +160,8 @@ private:
const int _num_senders;
const bool _is_merging;
const TPartitionType::type _partition_type;
+ std::set<int> _bucket_dest_instances;
+ bool _has_bucket_dest_instances = false;
// use in merge sort
size_t _offset;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index d4f74843179..125c6e18fdd 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1563,8 +1563,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
?
_params.per_exch_num_senders.find(tnode.node_id)->second
: 0;
DCHECK_GT(num_senders, 0);
- op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode,
next_operator_id(), descs,
- num_senders);
+ auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(
+ pool, tnode, next_operator_id(), descs, num_senders);
+ if (!_params.bucket_seq_to_instance_idx.empty()) {
+ // Lets bucket-routed exchanges detect orphan instances (owning no
bucket) that
+ // no sender channel will ever address — their receivers must
start at EOS.
+
exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx);
+ }
+ op = exchange_op;
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 9e173acab7e..54989511eb8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -508,7 +508,8 @@ Status RuntimeState::register_consumer_runtime_filter(
const TRuntimeFilterDesc& desc, bool need_local_merge, int node_id,
std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
_registered_runtime_filter_ids.insert(desc.filter_id);
- bool need_merge = desc.has_remote_targets || need_local_merge;
+ bool need_merge = desc.has_remote_targets || need_local_merge ||
+ (desc.__isset.force_local_merge &&
desc.force_local_merge);
RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() :
local_runtime_filter_mgr();
RETURN_IF_ERROR(mgr->register_consumer_filter(this, desc, node_id,
consumer_filter));
// Stamp the consumer with the current recursive CTE stage so that
incoming publish RPCs
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index c4ffab8b5dc..f680936b5e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -127,6 +127,21 @@ public class PlanTranslatorContext {
// needs shuffle for correctness, not just for performance like
StreamingAgg pre-agg).
private final Map<PlanNodeId, Boolean> shuffledAncestorMap =
Maps.newHashMap();
+ // Whether the fragment currently being processed by AddLocalExchange is
eligible for the
+ // bucket → local-hash parallelism upgrade: a pooled bucket-join fragment
whose per-BE
+ // instance count exceeds (buckets-with-data per BE) ×
local_shuffle_bucket_upgrade_ratio.
+ // Computed once per fragment in AddLocalExchange.addLocalExchange from
the distributed
+ // plan's LocalShuffleBucketJoinAssignedJob assignments; read by
+ // HashJoinNode.enforceAndDeriveLocalExchange.
+ private boolean currentFragmentBucketUpgradeEligible = false;
+
+ // Per-node "a bucket join above me in this fragment already upgraded to
local hash" flag.
+ // An upgraded join marks its direct children so a stacked bucket join
below keeps its
+ // BUCKET_HASH_SHUFFLE requires: if it also upgraded, its LOCAL hash
output (keyed by ITS
+ // join keys) would type-satisfy the upper join's
requireSpecific(LOCAL_EXECUTION_HASH)
+ // and suppress the LE that re-aligns data to the upper join's keys →
wrong results.
+ private final Map<PlanNodeId, Boolean> bucketUpgradedAncestorMap =
Maps.newHashMap();
+
// Whether the current fragment uses LocalShuffleAssignedJob (pooling scan
with
// ignoreDataDistribution → _parallel_instances=1 in BE). When true,
serial operators
// indicate real pipeline bottlenecks needing PASSTHROUGH fan-out
(heavy_ops).
@@ -271,6 +286,22 @@ public class PlanTranslatorContext {
return shuffledAncestorMap.getOrDefault(node.getId(), false);
}
+ public void setCurrentFragmentBucketUpgradeEligible(boolean eligible) {
+ this.currentFragmentBucketUpgradeEligible = eligible;
+ }
+
+ public boolean isCurrentFragmentBucketUpgradeEligible() {
+ return currentFragmentBucketUpgradeEligible;
+ }
+
+ public void setHasBucketUpgradedAncestor(PlanNode node, boolean value) {
+ bucketUpgradedAncestorMap.put(node.getId(), value);
+ }
+
+ public boolean hasBucketUpgradedAncestor(PlanNode node) {
+ return bucketUpgradedAncestorMap.getOrDefault(node.getId(), false);
+ }
+
public SlotDescriptor addSlotDesc(TupleDescriptor t) {
return descTable.addSlotDescriptor(t);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 845c87eea9c..e5e0d9d1bd0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -311,7 +311,15 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
int bucketNum =
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
ConnectContext connectContext = ConnectContext.get();
- return totalBucketNum < connectContext.getTotalInstanceNum() *
0.8;
+ // <= 0 disables the downgrade entirely: with the FE local
shuffle planner's
+ // bucket -> local-hash upgrade
(local_shuffle_bucket_upgrade_ratio), few-bucket
+ // bucket shuffle no longer funnels, so keeping bucket shuffle
(anchored side
+ // needs no re-shuffle) can beat downgrading to shuffle join.
+ double downgradeRatio =
connectContext.getSessionVariable().getBucketShuffleDowngradeRatio();
+ if (downgradeRatio <= 0) {
+ return false;
+ }
+ return totalBucketNum < connectContext.getTotalInstanceNum() *
downgradeRatio;
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 0069ddd37db..fd1ff52f5f9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -31,6 +31,7 @@ import
org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBui
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
@@ -211,7 +212,7 @@ public class DistributePlanner {
List<AssignedJob> receiverInstances =
filterInstancesWhichCanReceiveDataFromRemote(
receiverPlan, enableShareHashTableForBroadcastJoin, linkNode);
if (linkNode.getPartitionType() ==
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
- receiverInstances = getDestinationsByBuckets(receiverPlan,
receiverInstances);
+ receiverInstances = getDestinationsByBuckets(receiverPlan,
receiverInstances, linkNode);
}
DataSink sink = senderPlan.getFragmentJob().getFragment().getSink();
@@ -231,12 +232,34 @@ public class DistributePlanner {
private List<AssignedJob> getDestinationsByBuckets(
PipelineDistributedPlan joinSide,
- List<AssignedJob> receiverInstances) {
+ List<AssignedJob> receiverInstances,
+ ExchangeNode linkNode) {
UnassignedScanBucketOlapTableJob bucketJob =
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
+ // The spread is only valid for a NON-serial exchange: a serial
exchange
+ // (use_serial_exchange / UNPARTITIONED) receives through one task per
worker and
+ // expects funnel destinations; spreading them loses every row
addressed to a
+ // non-first instance. Mirrors the !is_serial_operator() gate on the
BE orphan
+ // receiver fix.
+ if (isEnableLocalShufflePlanner()
+ &&
!linkNode.isSerialOperatorOnBe(statementContext.getConnectContext())
+ && !joinSide.getInstanceJobs().isEmpty()
+ && joinSide.getInstanceJobs().stream()
+
.allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) {
+ // When FE local shuffle planner is on, spread bucket destinations
across all pooled
+ // instances by their assigned join buckets — the same bucket ->
instance mapping as
+ // bucket_seq_to_instance_id sent to BE — instead of funneling
every bucket of a worker
+ // into its first instance and relying on BE local exchange to fan
out.
+ return sortDestinationInstancesByJoinBuckets(joinSide, bucketNum);
+ }
return sortDestinationInstancesByBuckets(joinSide, receiverInstances,
bucketNum);
}
+ private boolean isEnableLocalShufflePlanner() {
+ ConnectContext connectContext = statementContext.getConnectContext();
+ return connectContext != null &&
connectContext.getSessionVariable().isEnableLocalShufflePlanner();
+ }
+
private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
PipelineDistributedPlan receiverPlan,
boolean enableShareHashTableForBroadcastJoin,
@@ -252,6 +275,36 @@ public class DistributePlanner {
}
}
+ private List<AssignedJob> sortDestinationInstancesByJoinBuckets(
+ PipelineDistributedPlan plan, int bucketNum) {
+ AssignedJob[] instances = new AssignedJob[bucketNum];
+ for (AssignedJob instanceJob : plan.getInstanceJobs()) {
+ LocalShuffleBucketJoinAssignedJob localShuffleJob =
(LocalShuffleBucketJoinAssignedJob) instanceJob;
+ for (Integer bucketIndex :
localShuffleJob.getAssignedJoinBucketIndexes()) {
+ if (instances[bucketIndex] != null) {
+ throw new IllegalStateException(
+ "Multi instances assigned same join bucket: " +
instances[bucketIndex]
+ + " and " + instanceJob
+ );
+ }
+ instances[bucketIndex] = instanceJob;
+ }
+ }
+
+ for (int i = 0; i < instances.length; i++) {
+ if (instances[i] == null) {
+ instances[i] = new StaticAssignedJob(
+ i,
+ new TUniqueId(-1, -1),
+ plan.getFragmentJob(),
+ DummyWorker.INSTANCE,
+ new DefaultScanSource(ImmutableMap.of())
+ );
+ }
+ }
+ return Arrays.asList(instances);
+ }
+
private List<AssignedJob> sortDestinationInstancesByBuckets(
PipelineDistributedPlan plan, List<AssignedJob> unsorted, int
bucketNum) {
AssignedJob[] instances = new AssignedJob[bucketNum];
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index e1d607ea615..1adea56c13e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -22,9 +22,18 @@ import
org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* FE-side local exchange planner — inserts {@link LocalExchangeNode} into
each fragment's
@@ -81,11 +90,107 @@ public class AddLocalExchange {
if (maxPerBeInstances <= 1) {
continue;
}
+ context.setCurrentFragmentBucketUpgradeEligible(
+ isBucketUpgradeEligible(pipePlan, maxPerBeInstances,
context));
PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
addLocalExchangeForFragment(fragment, context);
}
}
+ /**
+ * Bucket → local-hash parallelism upgrade eligibility .
+ *
+ * A pooled bucket-join fragment runs its bucket joins at bucket-count
parallelism:
+ * each LocalShuffleBucketJoinAssignedJob owns a disjoint set of join
buckets and only
+ * instances with buckets do join work (e.g. 8 buckets/BE but 16
instances/BE → 8 idle).
+ * When nothing above the join needs bucket alignment, HashJoinNode can
re-distribute
+ * both sides with LOCAL_EXECUTION_HASH_SHUFFLE to use all instances — see
+ * {@link HashJoinNode#enforceAndDeriveLocalExchange}.
+ *
+ * This method computes the per-fragment numeric condition from the actual
instance
+ * assignment: maxPerBeInstances > maxBucketsWithDataPerWorker × ratio.
The ratio comes
+ * from session variable {@code local_shuffle_bucket_upgrade_ratio};
values <= 1 disable
+ * the upgrade entirely (a required parallelism gain of at most 1x means
no gain).
+ */
+ private boolean isBucketUpgradeEligible(PipelineDistributedPlan pipePlan,
+ long maxPerBeInstances, PlanTranslatorContext context) {
+ ConnectContext connectContext = context.getConnectContext();
+ if (connectContext == null || connectContext.getSessionVariable() ==
null) {
+ return false;
+ }
+ double ratio =
connectContext.getSessionVariable().getLocalShuffleBucketUpgradeRatio();
+ List<AssignedJob> instanceJobs = pipePlan.getInstanceJobs();
+ if (instanceJobs.isEmpty()
+ ||
!instanceJobs.stream().allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance))
{
+ // Only pooled bucket-join fragments have the bucket-count
parallelism cap.
+ return false;
+ }
+ Map<Long, Set<Integer>> bucketsPerWorker = new HashMap<>();
+ Map<Long, Integer> instancesPerWorker = new HashMap<>();
+ Map<Long, Integer> coresPerWorker = new HashMap<>();
+ for (AssignedJob job : instanceJobs) {
+ long workerId = job.getAssignedWorker().id();
+ bucketsPerWorker.computeIfAbsent(workerId, k -> new HashSet<>())
+ .addAll(((LocalShuffleBucketJoinAssignedJob)
job).getAssignedJoinBucketIndexes());
+ instancesPerWorker.merge(workerId, 1, Integer::sum);
+ coresPerWorker.computeIfAbsent(workerId, k ->
resolveWorkerCores(job.getAssignedWorker()));
+ }
+ // Conservative: every worker that owns buckets must clear the gain
bar. The gain is
+ // computed on EFFECTIVE parallelism (capped by the BE's executor
threads): when the
+ // bucket count already saturates the cores, adding instances cannot
speed the join
+ // up and the extra local exchange is a pure cost.
+ boolean anyBuckets = false;
+ for (Map.Entry<Long, Set<Integer>> entry :
bucketsPerWorker.entrySet()) {
+ int buckets = entry.getValue().size();
+ if (buckets == 0) {
+ continue;
+ }
+ anyBuckets = true;
+ int instances = instancesPerWorker.getOrDefault(entry.getKey(), 0);
+ int cores = coresPerWorker.getOrDefault(entry.getKey(),
Integer.MAX_VALUE);
+ if (!shouldUpgradeBucketParallelism(ratio,
+ Math.min(instances, cores), Math.min(buckets, cores))) {
+ return false;
+ }
+ }
+ return anyBuckets;
+ }
+
+ /**
+ * Effective execution threads of the worker's backend
(pipelineExecutorSize, falling
+ * back to cpuCores). Values <= 1 mean the heartbeat has not reported yet
— treat the
+ * capacity as unknown/uncapped rather than blocking the upgrade.
+ */
+ private static int resolveWorkerCores(
+
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker
worker) {
+ if (worker instanceof
org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) {
+ org.apache.doris.system.Backend backend =
+
((org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker)
worker).getBackend();
+ int size = backend.getPipelineExecutorSize();
+ if (size <= 1) {
+ size = backend.getCputCores();
+ }
+ if (size > 1) {
+ return size;
+ }
+ }
+ return Integer.MAX_VALUE;
+ }
+
+ /**
+ * Pure numeric gate for the bucket → local-hash upgrade.
+ * ratio <= 1 (including 0 and negatives) always disables; otherwise
upgrade when the
+ * per-BE instance count exceeds buckets-with-data × ratio (i.e. the
parallelism gain
+ * is at least the configured multiple).
+ */
+ static boolean shouldUpgradeBucketParallelism(double ratio, long
maxPerBeInstances,
+ long maxBucketsPerWorker) {
+ if (ratio <= 1.0) {
+ return false;
+ }
+ return maxBucketsPerWorker > 0 && maxPerBeInstances >
maxBucketsPerWorker * ratio;
+ }
+
private void addLocalExchangeForFragment(PlanFragment fragment,
PlanTranslatorContext context) {
DataSink sink = fragment.getSink();
LocalExchangeTypeRequire require = sink == null
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 256f89843eb..8898987d9a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -165,6 +165,13 @@ public class ExchangeNode extends PlanNode {
@Override
public boolean isSerialOperatorOnBe(ConnectContext context) {
+ if (context != null &&
context.getSessionVariable().isEnableLocalShufflePlanner()) {
+ // When FE local shuffle planner is on, decouple exchange from
scan's serial flag.
+ // Scan pooling is handled by LE(PT) after scan; exchange keeps
its own parallelism.
+ return fragment != null
+ && isSerialNode()
+ && fragment.useSerialSource(context);
+ }
return fragment != null
&& (isSerialNode() || fragment.hasSerialScanNode())
&& fragment.useSerialSource(context);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 567c99866fd..e9d8ac63041 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -344,31 +344,65 @@ public class HashJoinNode extends JoinNodeBase {
// For a non-serial probe without the flag: propagate the probe's
distribution.
outputType = probePassthrough ? LocalExchangeType.PASSTHROUGH :
null;
} else if (isColocate() || isBucketShuffle()) {
- // Both probe and build sides require BUCKET_HASH_SHUFFLE: the
bucket distribution
- // must be preserved on both inputs. A serial child on either side
is handled the
- // same way (serial exchange returns NOOP → enforceRequire()
inserts the LE).
- probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
- // For BUCKET_SHUFFLE with serial build child: use
requireBucketHash() (not
- // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has
no shared
- // hash table mechanism — PASS_TO_ONE routes all data to task 0
while tasks 1..N-1
- // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE
correctly distributes
- // build data by bucket to match the probe side's bucket
distribution.
- // The serial exchange returns NOOP, so enforceRequire() will
insert a
- // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out
for heavy-ops
- // bottleneck avoidance).
- buildSideRequire = LocalExchangeTypeRequire.requireBucketHash();
- outputType = AddLocalExchange.resolveExchangeType(
- LocalExchangeTypeRequire.requireBucketHash());
+ if (canUpgradeBucketToLocalHash(translatorContext, parentRequire))
{
+ // Bucket → local-hash parallelism upgrade (bucket-to-hash
upgrade): the fragment
+ // has noticeably more instances than buckets-with-data (see
+ // AddLocalExchange.isBucketUpgradeEligible) and nothing above
this join needs
+ // bucket alignment — re-distribute both sides by their
distribute keys with
+ // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full
instance parallelism
+ // instead of being capped at bucket count. The LE keys come
from
+ // childrenDistributeExprLists (pairwise-aligned per side, a
subset of the
+ // equi-join keys), so both sides keep hashing the same values
and the
+ // per-instance build/probe pairing stays correct.
+ //
+ // requireSpecific (not requireHash) on purpose: the children's
+ // BUCKET_HASH_SHUFFLE output must NOT satisfy this require,
otherwise no LE
+ // is inserted and the join stays bucket-capped.
+ //
+ // Mark direct children so a stacked bucket join below keeps
its BUCKET
+ // requires: if it also upgraded, its LOCAL hash output (keyed
by ITS join
+ // keys) would type-satisfy our
requireSpecific(LOCAL_EXECUTION_HASH) and
+ // suppress the LE that re-aligns data to OUR keys → wrong
results.
+
translatorContext.setHasBucketUpgradedAncestor(children.get(0), true);
+
translatorContext.setHasBucketUpgradedAncestor(children.get(1), true);
+ probeSideRequire = LocalExchangeTypeRequire.requireSpecific(
+ LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ buildSideRequire = LocalExchangeTypeRequire.requireSpecific(
+ LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ // Whole-chain upgrade: a stacked bucket join below an
upgraded one also
+ // upgrades (16-way instead of bucket-capped), but must NOT
let its LOCAL
+ // hash claim type-satisfy the upper join's
requireSpecific(LOCAL) — the
+ // keys may differ (each level hashes its own distribute
exprs). Claim NOOP
+ // so the upper join always inserts its own re-align LE; that
LE existed in
+ // the bucket world too (bucket claim never satisfied LOCAL
require), so
+ // the chain upgrade is pure parallelism gain.
+ outputType = translatorContext.hasBucketUpgradedAncestor(this)
+ ? LocalExchangeType.NOOP
+ : null; // null: derived from probeResult.second below
+ } else {
+ probeSideRequire =
LocalExchangeTypeRequire.requireBucketHash();
+ // For BUCKET_SHUFFLE with serial build child: use
requireBucketHash() (not
+ // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE
has no shared
+ // hash table mechanism — PASS_TO_ONE routes all data to task
0 while tasks 1..N-1
+ // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE
correctly distributes
+ // build data by bucket to match the probe side's bucket
distribution.
+ // The serial exchange returns NOOP, so enforceRequire() will
insert a
+ // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH
fan-out for heavy-ops
+ // bottleneck avoidance).
+ buildSideRequire =
LocalExchangeTypeRequire.requireBucketHash();
+ outputType = AddLocalExchange.resolveExchangeType(
+ LocalExchangeTypeRequire.requireBucketHash());
+ }
} else {
// PARTITIONED (shuffle) join: both sides enter via global hash
exchange.
// Require GLOBAL specifically so that any inserted exchange uses
the same
// instance mapping as the cross-fragment exchange. LOCAL hash has
a different
// modulus (per-BE instance count vs total instance count) and
would cause
- // join mismatches (DORIS-26101).
+ // join mismatches (cross-fragment exchange key mismatch).
//
// Exception: serial source (use_serial_exchange=true + pooling).
The serial
// exchange sends to a single BE so shuffle_idx_to_instance_idx
has only one
- // entry — GLOBAL hash would route data to non-existent indices
(DORIS-26120).
+ // entry — GLOBAL hash would route data to non-existent indices
(serial source global hash fallback).
// Fall back to generic requireHash() which resolves to LOCAL,
matching BE's
// _use_serial_source behavior.
boolean serialSource = fragment != null
@@ -394,4 +428,32 @@ public class HashJoinNode extends JoinNodeBase {
protected boolean shouldResetSerialFlagForChild(int childIndex) {
return childIndex == 1;
}
+
+ /**
+ * Whether this bucket-shuffle / colocate join may upgrade its children
requires from
+ * BUCKET_HASH_SHUFFLE to LOCAL_EXECUTION_HASH_SHUFFLE for higher
parallelism:
+ * <ul>
+ * <li>the fragment passed the numeric gate (instances vs
buckets-with-data × ratio),
+ * computed once per fragment in {@code AddLocalExchange};</li>
+ * <li>stacked bucket joins below an upgraded one also upgrade, but
report NOOP
+ * output so the upper join's re-align LE is always inserted — see
the
+ * whole-chain note in {@code enforceAndDeriveLocalExchange};</li>
+ * <li>the parent does not require bucket distribution of our output (an
upper
+ * bucket join's probe/build require — upgrading here would break
the bucket
+ * alignment it depends on);</li>
+ * <li>both sides have non-empty distribute exprs — they become the
LOCAL hash LE
+ * keys, an exprs-less hash exchange would be meaningless.</li>
+ * </ul>
+ */
+ private boolean canUpgradeBucketToLocalHash(PlanTranslatorContext
translatorContext,
+ LocalExchangeTypeRequire parentRequire) {
+ if (!translatorContext.isCurrentFragmentBucketUpgradeEligible()
+ || parentRequire.preferType() ==
LocalExchangeType.BUCKET_HASH_SHUFFLE) {
+ return false;
+ }
+ List<Expr> probeExprs = getChildDistributeExprList(0);
+ List<Expr> buildExprs = getChildDistributeExprList(1);
+ return probeExprs != null && !probeExprs.isEmpty()
+ && buildExprs != null && !buildExprs.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 28062b97b7a..77bb4ffcdc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -257,6 +257,29 @@ public final class RuntimeFilter {
return finalized;
}
+ /**
+ * DFS from {@code node} down to {@code target} within the fragment
(stopping at
+ * ExchangeNode boundaries). Returns null if target is not under node,
otherwise
+ * whether the path crosses a LocalExchangeNode.
+ */
+ private static Boolean pathCrossesLocalExchange(PlanNode node, PlanNode
target) {
+ if (node == target) {
+ return false;
+ }
+ for (PlanNode child : node.getChildren()) {
+ if (child instanceof ExchangeNode) {
+ // fragment boundary: a target behind it is a remote target,
handled by
+ // has_remote_targets
+ continue;
+ }
+ Boolean sub = pathCrossesLocalExchange(child, target);
+ if (sub != null) {
+ return sub || child instanceof LocalExchangeNode;
+ }
+ }
+ return null;
+ }
+
/**
* Serializes a runtime filter to Thrift.
*/
@@ -270,11 +293,25 @@ public final class RuntimeFilter {
tFilter.setHasRemoteTargets(hasRemoteTargets);
boolean hasSerialTargets = false;
+ boolean forceLocalMerge = false;
for (RuntimeFilterTarget target : targets) {
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(),
ExprToThriftVisitor.treeToThrift(target.expr));
hasSerialTargets = hasSerialTargets
|| target.node.isSerialOperatorOnBe(ConnectContext.get());
+ // Truthful merge signal: if a LocalExchangeNode sits between the
builder join
+ // and a same-fragment target scan, per-instance partial filters
are not aligned
+ // with the scan's data slice and must be merged before being
applied. BE used to
+ // infer this from the target scan's is_serial_operator (scan
pooled => LE
+ // in between), which silently breaks once the scan is
parallelized; this bit is
+ // computed from the actual plan after FE local exchange planning.
In BE-planned
+ // mode (planner off) the FE tree has no LocalExchangeNodes and
the bit stays
+ // false — the serial-flag inference still covers that world.
+ if (!forceLocalMerge && target.isLocalTarget) {
+ Boolean crossed = pathCrossesLocalExchange(builderNode,
target.node);
+ forceLocalMerge = crossed != null && crossed;
+ }
}
+ tFilter.setForceLocalMerge(forceLocalMerge);
boolean enableSyncFilterSize = ConnectContext.get() != null
&&
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2870c3443fe..01967590dfb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -361,6 +361,10 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_LOCAL_SHUFFLE_PLANNER =
"enable_local_shuffle_planner";
+ public static final String LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO =
"local_shuffle_bucket_upgrade_ratio";
+
+ public static final String BUCKET_SHUFFLE_DOWNGRADE_RATIO =
"bucket_shuffle_downgrade_ratio";
+
public static final String FORCE_TO_LOCAL_SHUFFLE =
"force_to_local_shuffle";
public static final String ENABLE_LOCAL_MERGE_SORT =
"enable_local_merge_sort";
@@ -1639,6 +1643,27 @@ public class SessionVariable implements Serializable,
Writable {
"Whether to force to local shuffle on pipelineX
engine."})
private boolean forceToLocalShuffle = false;
+ @VarAttrDef.VarAttr(
+ name = LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO, fuzzy = false, varType
= VariableAnnotation.EXPERIMENTAL,
+ description = {"FE规划Local Shuffle时, 当池化bucket
join所在fragment的每BE实例数大于"
+ + "每BE有数据分桶数的该倍数时, 将join两侧的桶分布本地重分发为hash分布以突破桶数并发上限。"
+ + "必须大于1才生效; 小于等于1(含0和负数)时关闭该优化",
+ "When FE plans local shuffle and a pooled bucket join
fragment has more instances"
+ + " per BE than (buckets-with-data per BE) * this ratio,
re-distribute both join"
+ + " sides with local hash instead of bucket hash so join
parallelism is no longer"
+ + " capped at bucket count. Only takes effect when > 1;
values <= 1 (including 0"
+ + " and negatives) disable the upgrade."}, needForward =
true)
+ private double localShuffleBucketUpgradeRatio = 1.5;
+
+ @VarAttrDef.VarAttr(
+ name = BUCKET_SHUFFLE_DOWNGRADE_RATIO, fuzzy = false, varType =
VariableAnnotation.EXPERIMENTAL,
+ description = {"当一侧基表总桶数小于总实例数的该倍数时, 放弃bucket shuffle
join降级为shuffle join。"
+ + "小于等于0时永不降级。默认0.8保持原有行为",
+ "Downgrade bucket shuffle join to shuffle join when the
base table side's total"
+ + " bucket count is less than total instance count times
this ratio. Values <= 0"
+ + " never downgrade. Default 0.8 keeps the original
behavior."}, needForward = true)
+ private double bucketShuffleDowngradeRatio = 0.8;
+
@VarAttrDef.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
private boolean enableLocalMergeSort = true;
@@ -4768,6 +4793,22 @@ public class SessionVariable implements Serializable,
Writable {
this.enableLocalShufflePlanner = enableLocalShufflePlanner;
}
+ public double getLocalShuffleBucketUpgradeRatio() {
+ return localShuffleBucketUpgradeRatio;
+ }
+
+ public void setLocalShuffleBucketUpgradeRatio(double
localShuffleBucketUpgradeRatio) {
+ this.localShuffleBucketUpgradeRatio = localShuffleBucketUpgradeRatio;
+ }
+
+ public double getBucketShuffleDowngradeRatio() {
+ return bucketShuffleDowngradeRatio;
+ }
+
+ public void setBucketShuffleDowngradeRatio(double
bucketShuffleDowngradeRatio) {
+ this.bucketShuffleDowngradeRatio = bucketShuffleDowngradeRatio;
+ }
+
public boolean enablePushDownNoGroupAgg() {
return enablePushDownNoGroupAgg;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index 7288ba49ca2..5ffe61ce8f5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.GroupingInfo;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.OrderByElement;
import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
@@ -229,12 +230,12 @@ public class LocalShuffleNodeCoverageTest {
hashJoin.setDistributionMode(DistributionMode.PARTITIONED);
Pair<PlanNode, LocalExchangeType> hashOutput =
hashJoin.enforceAndDeriveLocalExchange(
ctx, null, LocalExchangeTypeRequire.requireHash());
- // PARTITIONED join requires GLOBAL hash to match cross-fragment
exchange (DORIS-26101)
+ // PARTITIONED join requires GLOBAL hash to match cross-fragment
exchange
Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE,
hashOutput.second);
assertChildLocalExchangeType(hashJoin, 0,
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
assertChildLocalExchangeType(hashJoin, 1,
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
- // DORIS-26101: PARTITIONED join with probe child already providing
GLOBAL hash
+ // PARTITIONED join with probe child already providing GLOBAL hash
// (e.g. upstream ExchangeNode) should satisfy
requireGlobalExecutionHash without
// inserting a new exchange.
TrackingPlanNode probeGlobal = new TrackingPlanNode(nextPlanNodeId(),
@@ -251,7 +252,7 @@ public class LocalShuffleNodeCoverageTest {
"no exchange should be inserted when child already provides
GLOBAL hash");
Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1));
- // DORIS-26120: PARTITIONED join with serial source falls back to
LOCAL hash
+ // PARTITIONED join with serial source falls back to LOCAL hash
// because GLOBAL shuffle_idx_to_instance_idx is incomplete for serial
exchange.
TrackingScanNode probeSerial = new TrackingScanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
TrackingPlanNode buildSerial = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
@@ -312,6 +313,157 @@ public class LocalShuffleNodeCoverageTest {
assertChildLocalExchangeType(serialBuildBroadcast, 1,
LocalExchangeType.PASS_TO_ONE);
}
+ private static List<List<Expr>> mockDistributeExprLists() {
+ return Lists.newArrayList(
+ Collections.singletonList(Mockito.mock(SlotRef.class)),
+ Collections.singletonList(Mockito.mock(SlotRef.class)));
+ }
+
+ @Test
+ public void testHashJoinBucketUpgradeToLocalHash() {
+ List<Expr> eqConjuncts =
Collections.singletonList(Mockito.mock(BinaryPredicate.class));
+
+ // 1. Eligible fragment + parent doesn't need bucket → both sides
re-distributed
+ // with LOCAL_EXECUTION_HASH_SHUFFLE, output reports LOCAL hash.
+ PlanTranslatorContext upgradeCtx = new PlanTranslatorContext();
+ upgradeCtx.setCurrentFragmentBucketUpgradeEligible(true);
+ TrackingPlanNode probeBucket = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+ TrackingPlanNode buildNoop = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ HashJoinNode upgradedJoin = new HashJoinNode(nextPlanNodeId(),
probeBucket, buildNoop,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+ upgradedJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ upgradedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ Pair<PlanNode, LocalExchangeType> upgradedOutput =
upgradedJoin.enforceAndDeriveLocalExchange(
+ upgradeCtx, null, LocalExchangeTypeRequire.requireHash());
+ // BUCKET claim must NOT satisfy the upgrade's
requireSpecific(LOCAL_EXECUTION_HASH):
+ // an LE is inserted on both sides.
+
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE,
upgradedOutput.second);
+ assertChildLocalExchangeType(upgradedJoin, 0,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ assertChildLocalExchangeType(upgradedJoin, 1,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+ // 2. Child already providing LOCAL hash satisfies the upgraded
require — no extra LE.
+ PlanTranslatorContext satisfiedCtx = new PlanTranslatorContext();
+ satisfiedCtx.setCurrentFragmentBucketUpgradeEligible(true);
+ TrackingPlanNode probeLocal = new TrackingPlanNode(nextPlanNodeId(),
+ LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ TrackingPlanNode buildLocal = new TrackingPlanNode(nextPlanNodeId(),
+ LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ HashJoinNode satisfiedJoin = new HashJoinNode(nextPlanNodeId(),
probeLocal, buildLocal,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+
satisfiedJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ satisfiedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ Pair<PlanNode, LocalExchangeType> satisfiedUpgrade =
satisfiedJoin.enforceAndDeriveLocalExchange(
+ satisfiedCtx, null, LocalExchangeTypeRequire.requireHash());
+
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE,
satisfiedUpgrade.second);
+ Assertions.assertSame(probeLocal, satisfiedJoin.getChild(0));
+ Assertions.assertSame(buildLocal, satisfiedJoin.getChild(1));
+
+ // 3. Parent requires bucket distribution (upper bucket join) → no
upgrade even when
+ // the fragment is eligible: children keep BUCKET_HASH_SHUFFLE.
+ PlanTranslatorContext parentBucketCtx = new PlanTranslatorContext();
+ parentBucketCtx.setCurrentFragmentBucketUpgradeEligible(true);
+ TrackingPlanNode probeForBucketParent = new
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+ TrackingPlanNode buildForBucketParent = new
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+ HashJoinNode bucketParentJoin = new HashJoinNode(nextPlanNodeId(),
probeForBucketParent,
+ buildForBucketParent, JoinOperator.INNER_JOIN, eqConjuncts,
Collections.emptyList(),
+ null, null, false);
+
bucketParentJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ bucketParentJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ Pair<PlanNode, LocalExchangeType> bucketParentOutput =
bucketParentJoin.enforceAndDeriveLocalExchange(
+ parentBucketCtx, null,
LocalExchangeTypeRequire.requireBucketHash());
+ Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE,
bucketParentOutput.second);
+ assertChildLocalExchangeType(bucketParentJoin, 0,
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+ assertChildLocalExchangeType(bucketParentJoin, 1,
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+ // 4. Fragment not eligible (ratio gate failed / not a pooled bucket
fragment) →
+ // existing behavior untouched.
+ PlanTranslatorContext ineligibleCtx = new PlanTranslatorContext();
+ TrackingPlanNode probeIneligible = new
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+ TrackingPlanNode buildIneligible = new
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+ HashJoinNode ineligibleJoin = new HashJoinNode(nextPlanNodeId(),
probeIneligible, buildIneligible,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+
ineligibleJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ ineligibleJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ Pair<PlanNode, LocalExchangeType> ineligibleOutput =
ineligibleJoin.enforceAndDeriveLocalExchange(
+ ineligibleCtx, null, LocalExchangeTypeRequire.requireHash());
+ Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE,
ineligibleOutput.second);
+ assertChildLocalExchangeType(ineligibleJoin, 0,
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+ assertChildLocalExchangeType(ineligibleJoin, 1,
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+ // 5. Stacked bucket joins: the whole chain upgrades. The inner join
(direct probe
+ // child of the upgraded one) also upgrades its children to LOCAL
hash, but
+ // reports NOOP output so the outer join always inserts its own
re-align LE
+ // (keys may differ between levels).
+ PlanTranslatorContext stackedCtx = new PlanTranslatorContext();
+ stackedCtx.setCurrentFragmentBucketUpgradeEligible(true);
+ TrackingPlanNode innerProbe = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ TrackingPlanNode innerBuild = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ HashJoinNode innerJoin = new HashJoinNode(nextPlanNodeId(),
innerProbe, innerBuild,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+ innerJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ innerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ TrackingPlanNode outerBuild = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ HashJoinNode outerJoin = new HashJoinNode(nextPlanNodeId(), innerJoin,
outerBuild,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+ outerJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ outerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ Pair<PlanNode, LocalExchangeType> stackedOutput =
outerJoin.enforceAndDeriveLocalExchange(
+ stackedCtx, null, LocalExchangeTypeRequire.requireHash());
+
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE,
stackedOutput.second);
+ // outer upgraded: probe side wrapped with LOCAL hash LE (re-aligning
inner's output)
+ assertChildLocalExchangeType(outerJoin, 0,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ assertChildLocalExchangeType(outerJoin, 1,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ // inner upgraded too (whole-chain): its children get LOCAL hash LEs
+ assertChildLocalExchangeType(innerJoin, 0,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ assertChildLocalExchangeType(innerJoin, 1,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+ // 6. Colocate join takes the same upgrade path.
+ PlanTranslatorContext colocateCtx = new PlanTranslatorContext();
+ colocateCtx.setCurrentFragmentBucketUpgradeEligible(true);
+ TrackingPlanNode colocateProbe = new
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+ TrackingPlanNode colocateBuild = new
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+ HashJoinNode colocateJoin = new HashJoinNode(nextPlanNodeId(),
colocateProbe, colocateBuild,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+ colocateJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+ colocateJoin.setColocate(true, "test");
+ Pair<PlanNode, LocalExchangeType> colocateOutput =
colocateJoin.enforceAndDeriveLocalExchange(
+ colocateCtx, null, LocalExchangeTypeRequire.requireHash());
+
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE,
colocateOutput.second);
+ assertChildLocalExchangeType(colocateJoin, 0,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ assertChildLocalExchangeType(colocateJoin, 1,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+ // 7. Missing distribute exprs → no upgrade (the LOCAL hash LE would
have no keys).
+ PlanTranslatorContext noExprCtx = new PlanTranslatorContext();
+ noExprCtx.setCurrentFragmentBucketUpgradeEligible(true);
+ TrackingPlanNode probeNoExpr = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ TrackingPlanNode buildNoExpr = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ HashJoinNode noExprJoin = new HashJoinNode(nextPlanNodeId(),
probeNoExpr, buildNoExpr,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+ noExprJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+ Pair<PlanNode, LocalExchangeType> noExprOutput =
noExprJoin.enforceAndDeriveLocalExchange(
+ noExprCtx, null, LocalExchangeTypeRequire.requireHash());
+ Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE,
noExprOutput.second);
+ assertChildLocalExchangeType(noExprJoin, 0,
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+ assertChildLocalExchangeType(noExprJoin, 1,
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+ }
+
+
+ @Test
+ public void testShouldUpgradeBucketParallelismGate() {
+ // ratio <= 1 (including 0 and negatives) always disables — the knob
doubles as the
+ // off switch: requiring at most 1x parallelism gain means no gain.
+
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(0, 16,
8));
+
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(-1, 16,
8));
+
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.0, 16,
8));
+ // active threshold: instances must exceed buckets-with-data × ratio
+
Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16,
8));
+
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 12,
8));
+
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(2.0, 16,
8));
+
Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 256,
8));
+ // no buckets with data → nothing to upgrade
+
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16,
0));
+ }
+
@Test
public void testLocalExchangeNodeIsNotSerializedAsSerialOperator() {
SerialTrackingScanNode serialScan = new
SerialTrackingScanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
@@ -441,7 +593,7 @@ public class LocalShuffleNodeCoverageTest {
intersectNode.addChild(right);
Pair<PlanNode, LocalExchangeType> intersectOutput =
intersectNode.enforceAndDeriveLocalExchange(
ctx, null, LocalExchangeTypeRequire.requireHash());
- // PARTITIONED intersect requires GLOBAL hash (DORIS-26100)
+ // PARTITIONED intersect requires GLOBAL hash
Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE,
intersectOutput.second);
assertChildLocalExchangeType(intersectNode, 0,
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
assertChildLocalExchangeType(intersectNode, 1,
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4f8826bdb3f..fb8ef30150e 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1609,6 +1609,12 @@ struct TRuntimeFilterDesc {
// the listed partitions with the listed direction; absent partitions are
// unsafe for this RF target and must not be pruned by it.
20: optional map<Types.TPlanNodeId, list<TPartitionTargetExprMonotonicity>>
planId_to_partition_target_monotonicity;
+
+ // True when a local exchange sits between the filter builder (join) and a
same-fragment
+ // target scan: per-instance partial filters are then NOT aligned with the
scan's data
+ // slice and must be merged before being applied. Computed truthfully by FE
after local
+ // exchange planning; replaces inferring this from the target scan's
is_serial_operator.
+ 21: optional bool force_local_merge;
}
diff --git
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
new file mode 100644
index 00000000000..8d493c54e87
--- /dev/null
+++
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Bucket -> local-hash parallelism upgrade.
+ *
+ * A pooled bucket-join fragment runs its bucket joins at bucket-count
parallelism
+ * (only instances owning buckets do join work). When nothing above the join
needs
+ * bucket alignment and per-BE instances > buckets-with-data x ratio
+ * (session var local_shuffle_bucket_upgrade_ratio, > 1 enables, <= 1
disables),
+ * the FE planner re-distributes both join sides with
LOCAL_EXECUTION_HASH_SHUFFLE
+ * so the join uses all instances.
+ *
+ * Shape notes (verified against a live cluster):
+ * - LocalExchangeNodes only appear in EXPLAIN DISTRIBUTED PLAN (plain EXPLAIN
+ * renders the tree before AddLocalExchange runs).
+ * - Whether a bucket-shuffle join forms is cluster-dependent (Nereids
downgrades it
+ * when totalBucketNum < totalInstanceNum *
bucket_shuffle_downgrade_ratio). The suite
+ * pins bucket_shuffle_downgrade_ratio=0 to keep it forming, but the
plan-shape checks
+ * still gate on "did a BUCKET_HASH_SHUFFLE local exchange actually
appear?" and skip
+ * if not, so the test never hard-fails on an environment where it didn't
form.
+ * - The upgrade fires when min(task_num=16, cores) / min(buckets=4, cores) >
1.1, i.e.
+ * on any machine with >= 5 cores (5/4 = 1.25 > 1.1); CI and dev machines
comfortably
+ * exceed this. Bucket counts are kept low (4/3/3) so a modest core count
is enough.
+ * - The aggregation above must NOT group by the bucket key: a colocate agg
+ * requires bucket distribution of the join output and correctly blocks the
+ * upgrade via the parentRequire gate.
+ */
+suite("test_local_shuffle_bucket_upgrade") {
+
+ def hints = { ls_on, ratio ->
+ """/*+SET_VAR(
+ enable_sql_cache=false, disable_join_reorder=true,
+ disable_colocate_plan=true,
+ auto_broadcast_join_threshold=-1, broadcast_row_count_limit=0,
+ experimental_force_to_local_shuffle=true,
+ experimental_enable_parallel_scan=false,
+ enable_runtime_filter_prune=false,
+ enable_runtime_filter_partition_prune=false,
+ runtime_filter_type='IN,MIN_MAX',
+ parallel_pipeline_task_num=16,
+ parallel_exchange_instance_num=8,
+ query_timeout=600,
+ bucket_shuffle_downgrade_ratio=0,
+ local_shuffle_bucket_upgrade_ratio=${ratio},
+ enable_local_shuffle=${ls_on},
+ enable_local_shuffle_planner=${ls_on}
+ )*/"""
+ }
+
+ sql "DROP TABLE IF EXISTS lsbu_fact"
+ sql "DROP TABLE IF EXISTS lsbu_probe"
+ sql "DROP TABLE IF EXISTS lsbu_probe2"
+ sql """CREATE TABLE lsbu_fact (k INT, v BIGINT)
+ ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 4
+ PROPERTIES ("replication_num"="1")"""
+ sql """CREATE TABLE lsbu_probe (pk INT, k INT, w BIGINT)
+ ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3
+ PROPERTIES ("replication_num"="1")"""
+ sql """CREATE TABLE lsbu_probe2 (pk INT, k INT, w BIGINT)
+ ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3
+ PROPERTIES ("replication_num"="1")"""
+ sql """INSERT INTO lsbu_fact
+ SELECT CAST(number%50 AS INT), number*10+1
+ FROM numbers("number"="200")"""
+ sql """INSERT INTO lsbu_probe
+ SELECT CAST(number AS INT), CAST(number%50 AS INT), 1000+number
+ FROM numbers("number"="300")"""
+ sql """INSERT INTO lsbu_probe2
+ SELECT CAST(number AS INT), CAST(number%50 AS INT), 2000+number
+ FROM numbers("number"="170")"""
+
+ // group key pk%10 is NOT the bucket key, so the agg above does not require
+ // bucket distribution and the upgrade is allowed.
+ def singleJoin = { h ->
+ """SELECT ${h} p.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p.w) sw
+ FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
+ GROUP BY g ORDER BY g"""
+ }
+
+ // ---------- 1. plan shape (EXPLAIN DISTRIBUTED PLAN:
post-AddLocalExchange) ----------
+ // The upgrade replaces the bucket join's BUCKET_HASH_SHUFFLE local
exchange with
+ // LOCAL_EXECUTION_HASH_SHUFFLE. "BUCKET_HASH_SHUFFLE" names ONLY a
local-exchange type
+ // (the join op prints "BUCKET_SHUFFLE", the network sink
"BUCKET_SHFFULE_HASH_PARTITIONED"),
+ // so it is an unambiguous, fragment-local signal of the thing being
upgraded — unlike
+ // LOCAL_EXECUTION_HASH, which an agg-finalize fragment may also carry on
a multi-BE
+ // cluster regardless of the gate.
+ //
+ // First confirm a bucket-shuffle local exchange actually formed; if it
did not (cluster
+ // shaped the join differently), there is nothing to upgrade, so skip
rather than fail.
+ def countBucketHashLe = { String planText ->
planText.split("BUCKET_HASH_SHUFFLE").length - 1 }
+
+ def bucketText = (sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true',
'0'))}").toString()
+ int bucketLeCount = countBucketHashLe(bucketText)
+ if (bucketLeCount == 0) {
+ logger.warn("bucket-shuffle join did not form in this environment; "
+ + "skipping single-join upgrade plan-shape checks")
+ } else {
+ // ratio=1.1 upgrades the bucket join → all BUCKET_HASH local
exchanges are gone
+ def upgradedText = (sql "EXPLAIN DISTRIBUTED PLAN
${singleJoin(hints('true', '1.1'))}").toString()
+ assertEquals(0, countBucketHashLe(upgradedText),
+ "ratio=1.1 must upgrade away the bucket join's BUCKET_HASH_SHUFFLE
local exchanges")
+
+ // ratio <= 1 disables the upgrade → bucket-hash local exchanges
unchanged
+ def ratioOneText = (sql "EXPLAIN DISTRIBUTED PLAN
${singleJoin(hints('true', '1'))}").toString()
+ assertEquals(bucketLeCount, countBucketHashLe(ratioOneText),
+ "ratio=1 must keep the upgrade off (<=1 disables)")
+ }
+
+ // Note: whether a group-by-bucket-key agg blocks the upgrade depends on
the agg
+ // shape the optimizer picks (a colocate one-phase agg requires bucket
distribution
+ // and blocks it; a two-phase agg does not). That parentRequire gate is
covered
+ // deterministically by LocalShuffleNodeCoverageTest; here we only pin
correctness.
+ def bucketKeyAgg = { h ->
+ """SELECT ${h} f.k AS g, COUNT(*) c, SUM(p.w) sw
+ FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
+ GROUP BY g ORDER BY g"""
+ }
+ def bka_baseline = sql bucketKeyAgg(hints('false', '0'))
+ def bka_upgraded = sql bucketKeyAgg(hints('true', '1.1'))
+ assertEquals(50, bka_baseline.size())
+ assertEquals(bka_baseline, bka_upgraded,
+ "group-by-bucket-key agg over (possibly upgraded) bucket join must
stay correct")
+
+ // ---------- 2. correctness: single bucket join ----------
+ def single_baseline = sql singleJoin(hints('false', '0'))
+ def single_bucket = sql singleJoin(hints('true', '0'))
+ def single_upgraded = sql singleJoin(hints('true', '1.1'))
+
+ assertEquals(10, single_baseline.size())
+ assertEquals(single_baseline, single_bucket,
+ "bucket join (upgrade off) must match local-shuffle-off baseline")
+ assertEquals(single_baseline, single_upgraded,
+ "upgraded bucket join must match local-shuffle-off baseline")
+
+ // ---------- 3. correctness: stacked bucket joins ----------
+ def stackedJoin = { h ->
+ """SELECT ${h} p1.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p1.w) s1,
SUM(p2.w) s2
+ FROM lsbu_fact f
+ JOIN lsbu_probe p1 ON p1.k = f.k
+ JOIN lsbu_probe2 p2 ON p2.k = f.k
+ GROUP BY g ORDER BY g"""
+ }
+
+ // whole-chain shape: at an eligible ratio every level of the stacked
bucket chain
+ // upgrades (the lower join reports NOOP so the upper re-align LE is
kept), so all
+ // BUCKET_HASH local exchanges are upgraded away. Skip if the chain didn't
form here.
+ def stackedBucketText = (sql "EXPLAIN DISTRIBUTED PLAN
${stackedJoin(hints('true', '0'))}").toString()
+ if (countBucketHashLe(stackedBucketText) == 0) {
+ logger.warn("stacked bucket-shuffle chain did not form in this
environment; "
+ + "skipping stacked upgrade plan-shape check")
+ } else {
+ def stackedUpgradedText = (sql "EXPLAIN DISTRIBUTED PLAN
${stackedJoin(hints('true', '1.1'))}").toString()
+ assertEquals(0, countBucketHashLe(stackedUpgradedText),
+ "ratio=1.1 must upgrade away the stacked bucket chain's
BUCKET_HASH local exchanges")
+ }
+
+ // Forced-RF killer case: with the upgrade, the join build is hash-sliced;
the
+ // per-instance IN/MIN_MAX partial filters MUST be merged before
application
+ // (TRuntimeFilterDesc.force_local_merge). Before that fix this query
silently
+ // lost up to 96% of its rows.
+ def rfHints = { ratio ->
+ hints('true', ratio).replace(")*/",
+ ", enable_runtime_filter_prune=false,
runtime_filter_type='IN,MIN_MAX')*/")
+ }
+ def single_up_rf = sql "SELECT ${rfHints('1.1')} p.pk % 10 AS g, COUNT(*)
c, SUM(f.v) sv, SUM(p.w) sw FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
GROUP BY g ORDER BY g"
+ assertEquals(single_baseline, single_up_rf,
+ "upgraded bucket join with forced IN/MIN_MAX runtime filters must stay
correct")
+
+ def stacked_baseline = sql stackedJoin(hints('false', '0'))
+ def stacked_bucket = sql stackedJoin(hints('true', '0'))
+ def stacked_upgraded = sql stackedJoin(hints('true', '1.1'))
+
+ assertEquals(10, stacked_baseline.size())
+ assertEquals(stacked_baseline, stacked_bucket,
+ "stacked bucket joins (upgrade off) must match local-shuffle-off
baseline")
+ assertEquals(stacked_baseline, stacked_upgraded,
+ "stacked bucket joins (upgrade on) must match local-shuffle-off
baseline")
+}
diff --git
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
index e369dc0f11e..ca3fe027c47 100644
---
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
+++
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -19,7 +19,7 @@
* Regression tests for bugs discovered by RQG testing on the local-exchange2
branch.
*
* These queries triggered "must set shared state" errors or incorrect results
- * in RQG build 183992. Common conditions:
+ * in RQG testing. Common conditions:
* - use_serial_exchange=true (makes ALL Exchanges serial, not just
UNPARTITIONED)
* - enable_local_shuffle_planner=true (FE-planned local exchange)
* - parallel_pipeline_task_num > 1
@@ -65,7 +65,7 @@ suite("test_local_shuffle_rqg_bugs") {
PROPERTIES ("replication_num" = "1")
"""
- // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar +
bigint columns
+ // Table for RQG testing GLOBAL_HASH_SHUFFLE bugs — needs varchar + bigint
columns
sql """
CREATE TABLE rqg_t3 (
pk INT NOT NULL,
@@ -354,7 +354,7 @@ suite("test_local_shuffle_rqg_bugs") {
// local exchange on outer NLJ's build side because child was NLJ (not
ScanNode).
// Fixed in NestedLoopJoinNode.enforceAndDeriveLocalExchange by using
// fragment.useSerialSource() instead of instanceof ScanNode check.
- // This was the root cause of 989 RQG test failures (build 183677).
+ // This was the root cause of 989 RQG test failures (RQG testing).
// ============================================================
logger.info("=== Bug 6: CROSS_JOIN shared state - nested NLJ + pooling
scan (FE planner) ===")
@@ -513,7 +513,7 @@ suite("test_local_shuffle_rqg_bugs") {
// ============================================================
// Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched — self-join + NLJ
- // RQG case: 906784672 (build 184181)
+ // RQG regression case
// Root cause: HashJoinNode used requireGlobalExecutionHash() → GLOBAL
local exchange
// inserted when use_serial_exchange=true; shuffle_idx_to_instance_idx
map has only
// 4 entries (1/BE) but GLOBAL hash needs N*dop entries → most rows
unrouted (0 actual rows).
@@ -522,7 +522,7 @@ suite("test_local_shuffle_rqg_bugs") {
// then NLJ (LEFT JOIN table1 table3 ON pk > col_bigint_undef_signed)
// ============================================================
- logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join +
NLJ (build 184181 case 906784672) ===")
+ logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join +
NLJ (RQG testing case 906784672) ===")
def bug10_fe = sql """
SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
enable_local_shuffle_planner=true,
@@ -564,12 +564,12 @@ suite("test_local_shuffle_rqg_bugs") {
// ============================================================
// Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched — FULL OUTER JOIN + GROUP
BY
- // RQG case: 11007681241 (build 184181)
+ // RQG regression case
// Same root cause as Bug 10.
// SQL: FULL OUTER JOIN on col_bigint_undef_signed_not_null with WHERE +
GROUP BY
// ============================================================
- logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER
JOIN + GROUP BY (build 184181 case 11007681241) ===")
+ logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER
JOIN + GROUP BY (RQG testing case 11007681241) ===")
def bug11_fe = sql """
SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
enable_local_shuffle_planner=true,
@@ -603,12 +603,12 @@ suite("test_local_shuffle_rqg_bugs") {
// ============================================================
// Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched — LEFT JOIN + VARCHAR
predicates + MIN()
- // RQG case: 906784662 (build 184181)
+ // RQG regression case
// Same root cause as Bug 10/11.
// SQL: LEFT JOIN on pk with VARCHAR NOT IN / BETWEEN / IN predicates,
MIN() aggregate
// ============================================================
- logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN +
VARCHAR predicates (build 184181 case 906784662) ===")
+ logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN +
VARCHAR predicates (RQG testing case 906784662) ===")
def bug12_fe = sql """
SELECT /*+SET_VAR(use_serial_exchange=true,
parallel_pipeline_task_num=4,
enable_local_shuffle_planner=true,
@@ -646,7 +646,7 @@ suite("test_local_shuffle_rqg_bugs") {
// ============================================================
// Bug 13: NLJ COREDUMP — serial NLJ + pooling scan + BROADCAST build side
- // RQG build 184430, query c0dafc1bed0f4910
+ // RQG testing
// Root cause: serial NLJ (RIGHT_OUTER) with pooling scan inserted
BROADCAST
// local exchange on build side, inflating build pipeline num_tasks to
_num_instances
// while probe pipeline stayed at 1 task. Instance 1+ created build tasks
without
@@ -696,7 +696,7 @@ suite("test_local_shuffle_rqg_bugs") {
// ============================================================
// Bug 14: BUCKET_SHUFFLE join + serial build Exchange — must set shared
state
- // RQG build 184563, cases 906784706/906784783/906784987/906785006
+ // RQG testing
// Root cause: BUCKET_SHUFFLE join build side ExchangeNode marked serial
in
// pooling scan fragment → build pipeline num_tasks reduced to 1 →
// instance 1+ have probe tasks without build tasks → shared state
injection
@@ -925,7 +925,7 @@ suite("test_local_shuffle_rqg_bugs") {
//
// Both triggered by: OVER() with no PARTITION BY + GROUPING SETS +
// pptn=0 (auto-parallel) + disable_streaming_preaggregations=true
- // RQG build 186195, query IDs: 7f3178a77c2c4b6b, 71887f7bf804c0c,
5dd9fcad234c4484
+ // RQG testing
// ============================================================
sql "DROP TABLE IF EXISTS rqg_analytic_t1"
sql """
@@ -1160,9 +1160,58 @@ suite("test_local_shuffle_rqg_bugs") {
assertTrue(false, "Bug 20: Serial exchange + agg hang: ${t.message}")
}
+ // ============================================================
+ // Bug 20b: count(distinct)+std + RIGHT JOIN returns inflated distinct
count
+ // when use_serial_exchange=true + enable_local_exchange_before_agg=false.
+ // Root cause (BE-planned): AggSink early-return ignored that the serial
exchange
+ // child breaks the HASH(s) invariant via PASSTHROUGH fan-out; fixed
upstream by
+ // child_breaks_local_key_distribution (#63766). The FE planner fixes it
+ // structurally: requires are semantic, a hash LE is inserted instead of
+ // PASSTHROUGH. This case pins both paths.
+ // ============================================================
+ try {
+ logger.info("Bug 20b: count(distinct) under serial exchange")
+ sql "DROP TABLE IF EXISTS rqg_25413_t1"
+ sql "DROP TABLE IF EXISTS rqg_25413_t2"
+ sql """CREATE TABLE rqg_25413_t1 (pk INT NOT NULL, s VARCHAR(64) NOT
NULL, d DECIMAL(10,2) NOT NULL)
+ ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+ PROPERTIES ("replication_num"="1")"""
+ sql """CREATE TABLE rqg_25413_t2 (pk INT NOT NULL, dt DATETIME NOT
NULL)
+ ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+ PROPERTIES ("replication_num"="1")"""
+ sql """INSERT INTO rqg_25413_t1
+ SELECT CAST(number AS INT), concat('s', CAST(number % 29 AS
INT)),
+ CAST(number * 13 % 1000 AS DECIMAL(10,2))
+ FROM numbers("number"="200")"""
+ sql """INSERT INTO rqg_25413_t2
+ SELECT CAST(number AS INT),
+ date_add('2000-01-01 00:00:00', INTERVAL CAST(number %
3000 AS INT) DAY)
+ FROM numbers("number"="200")"""
+
+ def q25413 = { vars -> """
+ SELECT /*+SET_VAR(${vars})*/
+ count(distinct t1.s) AS cnt_distinct, std(t1.d) AS std_val
+ FROM rqg_25413_t1 t1
+ RIGHT JOIN rqg_25413_t2 t2 ON t1.pk = t2.pk
+ WHERE t2.dt < '2005-01-01 00:00:00'
+ """ }
+ def base25413 = "enable_sql_cache=false,
enable_local_exchange_before_agg=false, parallel_pipeline_task_num=4"
+ def expected25413 = sql q25413(base25413)
+ for (planner in ['false', 'true']) {
+ def actual = sql q25413(
+ "${base25413}, experimental_use_serial_exchange=true,
enable_local_shuffle_planner=${planner}")
+ assertEquals(expected25413, actual,
+ "Bug 20b planner=${planner}: distinct count must not be
inflated under serial exchange")
+ }
+ logger.info("Bug 20b: PASSED")
+ } catch (Throwable t) {
+ logger.error("Bug 20b FAILED: ${t.message}")
+ assertTrue(false, "Bug 20b: ${t.message}")
+ }
+
// ============================================================
// Bug 21: Multi-distinct COUNT on many-bucket table → COREDUMP
- // RQG build 186737/186929/186952: AggSinkOperatorX::sink →
set_ready_to_read
+ // AggSinkOperatorX::sink → set_ready_to_read
// with empty source_deps.
//
// Root cause: AGG operators (streaming, distinct-streaming, serialize)
requested
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]