morningman commented on a change in pull request #5521:
URL: https://github.com/apache/incubator-doris/pull/5521#discussion_r608779297
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -294,16 +298,52 @@ private boolean isBroadcastCostSmaller(long
broadcastCost, long partitionCost)
}
/**
- * Creates either a broadcast join or a repartitioning join, depending on
the expected cost. If any of the inputs to
- * the cost computation is unknown, it assumes the cost will be 0. Costs
being equal, it'll favor partitioned over
- * broadcast joins. If perNodeMemLimit > 0 and the size of the hash table
for a broadcast join is expected to exceed
- * that mem limit, switches to partitioned join instead. TODO: revisit the
choice of broadcast as the default TODO:
- * don't create a broadcast join if we already anticipate that this will
exceed the query's memory budget.
+ * There are 4 kinds of distributed hash join methods in Doris:
+ * Colocate, Bucket Shuffle, Broadcast, Shuffle
+ * The priority between these four distributed execution methods is
following:
+ * Colocate > Bucket Shuffle > Broadcast > Shuffle
+ * This function is mainly used to choose the most suitable distributed
method for the 'node',
+ * and transform it into PlanFragment.
*/
private PlanFragment createHashJoinFragment(HashJoinNode node,
PlanFragment rightChildFragment,
PlanFragment
leftChildFragment, long perNodeMemLimit,
ArrayList<PlanFragment>
fragments)
throws UserException {
+ List<String> reason = Lists.newArrayList();
+ if (canColocateJoin(node, leftChildFragment, rightChildFragment,
reason)) {
+ node.setColocate(true, "");
+
//node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
Review comment:
remove unused code
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -471,58 +468,167 @@ private PlanFragment createHashJoinFragment(HashJoinNode
node, PlanFragment righ
rightChildFragment.setDestination(rhsExchange);
rightChildFragment.setOutputPartition(rhsJoinPartition);
- // Before we support global runtime filter, only shuffle join do
not enable local runtime filter
+ // TODO: Before we support global runtime filter, only shuffle
join do not enable local runtime filter
node.setIsPushDown(false);
return joinFragment;
}
}
+ /**
+ * Colocate Join can be performed when the following 4 conditions are met
at the same time.
+ * 1. Session variables disable_colocate_plan = true
+ * 2. There is no join hints in HashJoinNode
+ * 3. There are no exchange node between source scan node and HashJoinNode.
+ * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are
colocate and group can be matched.
+ */
private boolean canColocateJoin(HashJoinNode node, PlanFragment
leftChildFragment, PlanFragment rightChildFragment,
- List<String> cannotReason) {
- if (Config.disable_colocate_join) {
- cannotReason.add("Disabled");
+ List<String> cannotReason) {
+ // Condition1
+ if (ConnectContext.get().getSessionVariable().isDisableColocatePlan())
{
+ cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED);
return false;
}
- if (ConnectContext.get().getSessionVariable().isDisableColocateJoin())
{
- cannotReason.add("Session disabled");
+ // Condition2: If user have a join hint to use proper way of join, can
not be colocate join
+ if (node.getInnerRef().hasJoinHints()) {
+ cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT);
return false;
}
- // If user have a join hint to use proper way of join, can not be
colocate join
- if (node.getInnerRef().hasJoinHints()) {
- cannotReason.add("Has join hint");
- return false;
+ // Condition3:
+ // If there is an exchange node between the HashJoinNode and their
real associated ScanNode,
+ // it means that the data has been rehashed.
+ // The rehashed data can no longer be guaranteed to correspond to the
left and right buckets,
+ // and naturally cannot be colocate
+ Map<Pair<OlapScanNode, OlapScanNode>, List<BinaryPredicate>>
scanNodeWithJoinConjuncts = Maps.newHashMap();
+ for (BinaryPredicate eqJoinPredicate : node.getEqJoinConjuncts()) {
+ OlapScanNode leftScanNode =
genSrcScanNode(eqJoinPredicate.getChild(0), leftChildFragment, cannotReason);
+ if (leftScanNode == null) {
+ return false;
+ }
+ OlapScanNode rightScanNode =
genSrcScanNode(eqJoinPredicate.getChild(1), rightChildFragment, cannotReason);
+ if (rightScanNode == null) {
+ return false;
+ }
+ Pair<OlapScanNode, OlapScanNode> eqPair = new Pair<>(leftScanNode,
rightScanNode);
+ List<BinaryPredicate> predicateList =
scanNodeWithJoinConjuncts.get(eqPair);
+ if (predicateList == null) {
+ predicateList = Lists.newArrayList();
+ scanNodeWithJoinConjuncts.put(eqPair, predicateList);
+ }
+ predicateList.add(eqJoinPredicate);
}
- PlanNode leftRoot = leftChildFragment.getPlanRoot();
- PlanNode rightRoot = rightChildFragment.getPlanRoot();
+ // Condition4
+ return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts,
cannotReason);
+ }
- //leftRoot should be ScanNode or HashJoinNode, rightRoot should be
ScanNode
- if (leftRoot instanceof OlapScanNode && rightRoot instanceof
OlapScanNode) {
- return canColocateJoin(node, leftRoot, rightRoot, cannotReason);
+ private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment,
List<String> cannotReason) {
+ SlotRef slotRef = expr.getSrcSlotRef();
+ if (slotRef == null) {
+
cannotReason.add(DistributedPlanColocateRule.TRANSFORMED_SRC_COLUMN);
+ return null;
+ }
+ ScanNode scanNode = planFragment.getPlanRoot()
+
.getScanNodeInOneFragmentByTupleId(slotRef.getDesc().getParent().getId());
+ if (scanNode == null) {
+
cannotReason.add(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA);
+ return null;
}
+ if (scanNode instanceof OlapScanNode) {
+ return ((OlapScanNode) scanNode);
Review comment:
```suggestion
return (OlapScanNode) scanNode;
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -1175,8 +1207,9 @@ private PlanFragment createAnalyticFragment(
// required if the sort partition exprs reference a tuple that is
made nullable in
// 'childFragment' to bring NULLs from outer-join non-matches
together.
DataPartition sortPartition = sortNode.getInputPartition();
+ // TODO(ML): here
Review comment:
?
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -812,6 +843,7 @@ private PlanFragment createSetOperationNodeFragment(
}
// There is at least one partitioned child fragment.
+ // TODO(ML): here
Review comment:
TODO what?
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -471,58 +468,167 @@ private PlanFragment createHashJoinFragment(HashJoinNode
node, PlanFragment righ
rightChildFragment.setDestination(rhsExchange);
rightChildFragment.setOutputPartition(rhsJoinPartition);
- // Before we support global runtime filter, only shuffle join do
not enable local runtime filter
+ // TODO: Before we support global runtime filter, only shuffle
join do not enable local runtime filter
node.setIsPushDown(false);
return joinFragment;
}
}
+ /**
+ * Colocate Join can be performed when the following 4 conditions are met
at the same time.
+ * 1. Session variables disable_colocate_plan = true
Review comment:
```suggestion
* 1. Session variables disable_colocate_plan = false
```
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -932,9 +964,9 @@ private PlanFragment createAggregationFragment(
if (isDistinct) {
return createPhase2DistinctAggregationFragment(node,
childFragment, fragments);
} else {
-
// Check table's distribution. See #4481.
PlanNode childPlan = childFragment.getPlanRoot();
+ // TODO(ML): here
Review comment:
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]