github-actions[bot] commented on code in PR #62054:
URL: https://github.com/apache/doris/pull/62054#discussion_r3032496595
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java:
##########
@@ -231,33 +230,53 @@ private void linkPipelinePlan(
private List<AssignedJob> getDestinationsByBuckets(
PipelineDistributedPlan joinSide,
- List<AssignedJob> receiverInstances) {
+ List<AssignedJob> receiverInstances,
+ boolean useJoinBucketAssignment) {
UnassignedScanBucketOlapTableJob bucketJob =
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
- return sortDestinationInstancesByBuckets(joinSide, receiverInstances,
bucketNum);
+ return sortDestinationInstancesByBuckets(joinSide, receiverInstances,
bucketNum,
+ useJoinBucketAssignment);
}
private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
PipelineDistributedPlan receiverPlan,
- boolean enableShareHashTableForBroadcastJoin,
ExchangeNode linkNode) {
- boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
- .anyMatch(LocalShuffleAssignedJob.class::isInstance);
- if (useLocalShuffle) {
- return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
- } else if (enableShareHashTableForBroadcastJoin &&
linkNode.isRightChildOfBroadcastHashJoin()) {
+ // isSerialOperator(): UNPARTITIONED or use_serial_exchange
(operator-level)
+ // useSerialSource(): fragment is in pooling mode (fragment-level
guard)
Review Comment:
`filterInstancesWhichCanReceiveDataFromRemote()` used to collapse
destinations not only for local-shuffle serial exchanges, but also when
`enableShareHashTableForBroadcastJoin` and
`linkNode.isRightChildOfBroadcastHashJoin()` were true. That second branch is
still part of the execution contract: `PhysicalPlanTranslator` still marks the
right side of broadcast joins
(`fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:1705-1710`),
and the legacy coordinator still sends only one destination per BE while the
sibling instances share that hash table
(`fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:1451-1470`,
`1594-1614`). After this change, Nereids will add every local instance as a
destination for those exchanges, so the build side gets broadcast to every
pipeline instance again. That reintroduces the per-instance network/build cost
the shared-hash-table optimization was avoiding for ordinary broadcast joins,
even though this
PR is only trying to relax bucket-shuffle seriality. Please preserve the
broadcast-join dedupe branch here (or fold it into the new predicate) alongside
the bucket-shuffle fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]