github-actions[bot] commented on code in PR #62054:
URL: https://github.com/apache/doris/pull/62054#discussion_r3032496595


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java:
##########
@@ -231,33 +230,53 @@ private void linkPipelinePlan(
 
     private List<AssignedJob> getDestinationsByBuckets(
             PipelineDistributedPlan joinSide,
-            List<AssignedJob> receiverInstances) {
+            List<AssignedJob> receiverInstances,
+            boolean useJoinBucketAssignment) {
         UnassignedScanBucketOlapTableJob bucketJob = 
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
         int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
-        return sortDestinationInstancesByBuckets(joinSide, receiverInstances, 
bucketNum);
+        return sortDestinationInstancesByBuckets(joinSide, receiverInstances, 
bucketNum,
+                useJoinBucketAssignment);
     }
 
     private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
             PipelineDistributedPlan receiverPlan,
-            boolean enableShareHashTableForBroadcastJoin,
             ExchangeNode linkNode) {
-        boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
-                .anyMatch(LocalShuffleAssignedJob.class::isInstance);
-        if (useLocalShuffle) {
-            return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
-        } else if (enableShareHashTableForBroadcastJoin && 
linkNode.isRightChildOfBroadcastHashJoin()) {
+        // isSerialOperator(): UNPARTITIONED or use_serial_exchange 
(operator-level)
+        // useSerialSource(): fragment is in pooling mode (fragment-level 
guard)

Review Comment:
   `filterInstancesWhichCanReceiveDataFromRemote()` used to collapse 
destinations not only for local-shuffle serial exchanges, but also when 
`enableShareHashTableForBroadcastJoin` and 
`linkNode.isRightChildOfBroadcastHashJoin()` were true. That second branch is 
still part of the execution contract: `PhysicalPlanTranslator` still marks the 
right side of broadcast joins 
(`fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:1705-1710`),
 and the legacy coordinator still sends only one destination per BE while the 
sibling instances share that hash table 
(`fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:1451-1470`, 
`1594-1614`). After this change, Nereids will add every local instance as a 
destination for those exchanges, so the build side gets broadcast to every 
pipeline instance again. That reintroduces the per-instance network/build cost 
the shared-hash-table optimization was avoiding for ordinary broadcast joins, 
even though this 
 PR is only trying to relax bucket-shuffle seriality. Please preserve the 
broadcast-join dedupe branch here (or fold it into the new predicate) alongside 
the bucket-shuffle fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to