morningman commented on a change in pull request #4677:
URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495541353
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -390,6 +390,28 @@ private PlanFragment createHashJoinFragment(HashJoinNode
node, PlanFragment righ
node.setColocate(false, reason.get(0));
}
+ // bucket shuffle join is better than boradcast and shuffle join
Review comment:
```suggestion
// bucket shuffle join is better than broadcast and shuffle join
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node,
PlanFragment leftChildFragmen
return false;
}
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment
leftChildFragment,
+ List<Expr> rhsHashExprs) {
+ if
(!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+ return false;
+ }
+ // If user have a join hint to use proper way of join, can not be
colocate join
Review comment:
```suggestion
// If user have a join hint to use proper way of join, can not be
bucket join
```
##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
##########
@@ -1399,6 +1429,175 @@ public boolean isDone() {
}
+ class BucketShuffleJoinController {
Review comment:
Add comment for this class
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node,
PlanFragment leftChildFragmen
return false;
}
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment
leftChildFragment,
+ List<Expr> rhsHashExprs) {
+ if
(!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+ return false;
+ }
+ // If user have a join hint to use proper way of join, can not be
colocate join
+ if (node.getInnerRef().hasJoinHints()) {
+ return false;
+ }
+
+ PlanNode leftRoot = leftChildFragment.getPlanRoot();
+ //leftRoot should be ScanNode or HashJoinNode, rightRoot should be
ScanNode
Review comment:
Comment is wrong
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node,
PlanFragment leftChildFragmen
return false;
}
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment
leftChildFragment,
+ List<Expr> rhsHashExprs) {
+ if
(!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+ return false;
+ }
+ // If user have a join hint to use proper way of join, can not be
colocate join
Review comment:
And I think if user specify [SHUFFLE] hint, we should try to do bucket
shuffle too.
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node,
PlanFragment leftChildFragmen
return false;
}
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment
leftChildFragment,
+ List<Expr> rhsHashExprs) {
+ if
(!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
+ return false;
+ }
+ // If user have a join hint to use proper way of join, can not be
colocate join
+ if (node.getInnerRef().hasJoinHints()) {
+ return false;
+ }
+
+ PlanNode leftRoot = leftChildFragment.getPlanRoot();
+ //leftRoot should be ScanNode or HashJoinNode, rightRoot should be
ScanNode
+ if (leftRoot instanceof OlapScanNode) {
+ return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+ }
+
+ return false;
+ }
+
+ //the join expr must contian left table distribute column
+ private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
+ List<Expr> rhsJoinExprs) {
+ OlapScanNode leftScanNode = ((OlapScanNode) leftRoot);
+
+ //1 the left table must be only one partition
+ if (leftScanNode.getSelectedPartitionIds().size() > 1) {
+ return false;
+ }
+
+ DistributionInfo leftDistribution =
leftScanNode.getOlapTable().getDefaultDistributionInfo();
+
+ if (leftDistribution instanceof HashDistributionInfo ) {
+ List<Column> leftDistributeColumns = ((HashDistributionInfo)
leftDistribution).getDistributionColumns();
+
+ List<Column> leftJoinColumns = new ArrayList<>();
+ List<Expr> rightExprs = new ArrayList<>();
+ List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
+
+ for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
+ Expr lhsJoinExpr = eqJoinPredicate.getChild(0);
+ Expr rhsJoinExpr = eqJoinPredicate.getChild(1);
+ if (lhsJoinExpr.unwrapSlotRef() == null ||
rhsJoinExpr.unwrapSlotRef() == null) {
+ continue;
+ }
+
+ SlotDescriptor leftSlot =
lhsJoinExpr.unwrapSlotRef().getDesc();
+
+ leftJoinColumns.add(leftSlot.getColumn());
+ rightExprs.add(rhsJoinExpr);
+ }
+
+ //2 the join columns should contains all left table distribute
columns to enable bucket shuffle join
+ for (Column distributeColumn : leftDistributeColumns) {
+ int loc = leftJoinColumns.indexOf(distributeColumn);
+ // TODO: now support bucket shuffle join when distribute
column type different with
+ // right expr type
+ if (loc == -1 ||
!rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
+ return false;
+ }
+ rhsJoinExprs.add(rightExprs.get(loc));
+ }
+ }
Review comment:
```suggestion
} else {
return false;
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]