zzcclp commented on a change in pull request #1601: URL: https://github.com/apache/kylin/pull/1601#discussion_r635062296
########## File path: kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.spark.sql.execution + +import javax.annotation.concurrent.GuardedBy +import org.apache.kylin.common.KylinConfig +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{JoinHint, LogicalPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.{SparkSession, Strategy} + +/** + * Select the proper physical plan for join based on joining keys and size of logical plan. + * + * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the + * predicates can be evaluated by matching join keys. If found, join implementations are chosen + * with the following precedence: + * + * - Broadcast hash join (BHJ): + * BHJ is not supported for full outer join. For right outer join, we only can broadcast the + * left side. For left outer, left semi, left anti and the internal join type ExistenceJoin, + * we only can broadcast the right side. For inner like join, we can broadcast both sides. + * Normally, BHJ can perform faster than the other join algorithms when the broadcast side is + * small. However, broadcasting tables is a network-intensive operation. It could cause OOM + * or perform worse than the other join algorithms, especially when the build/broadcast side + * is big. + * + * For the supported cases, users can specify the broadcast hint (e.g. the user applied the + * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and session-based + * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is used and + * which join side is broadcast. + * + * 1) Broadcast the join side with the broadcast hint, even if the size is larger than + * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only when the type + * is inner like join), the side with a smaller estimated physical size will be broadcast. + * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side + * whose estimated physical size is smaller than the threshold. If both sides are below the + * threshold, broadcast the smaller side. If neither is smaller, BHJ is not used. + * + * - Shuffle hash join: if the average size of a single partition is small enough to build a hash + * table. + * + * - Sort merge: if the matching join keys are sortable. + * + * If there is no joining keys, Join implementations are chosen with the following precedence: + * - BroadcastNestedLoopJoin (BNLJ): + * BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios: + * For right outer join, the left side is broadcast. For left outer, left semi, left anti + * and the internal join type ExistenceJoin, the right side is broadcast. For inner like + * joins, either side is broadcast. + * + * Like BHJ, users still can specify the broadcast hint and session-based + * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is broadcast. + * + * 1) Broadcast the join side with the broadcast hint, even if the size is larger than + * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (i.e., just for + * inner-like join), the side with a smaller estimated physical size will be broadcast. + * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side + * whose estimated physical size is smaller than the threshold. If both sides are below the + * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used. + * + * - CartesianProduct: for inner like join, CartesianProduct is the fallback option. + * + * - BroadcastNestedLoopJoin (BNLJ): + * For the other join types, BNLJ is the fallback option. Here, we just pick the broadcast + * side with the broadcast hint. If neither side has a hint, we broadcast the side with + * the smaller estimated physical size. + */ +case class KylinJoinSelection(session: SparkSession) extends Strategy with PredicateHelper with Logging { + + val conf: SQLConf = session.sessionState.conf + + /** + * Matches a plan whose output should be small enough to be used in broadcast join. + */ + private def canBroadcast(plan: LogicalPlan): Boolean = { + val sizeInBytes = plan.stats.sizeInBytes + sizeInBytes >= 0 && sizeInBytes <= conf.autoBroadcastJoinThreshold && JoinMemoryManager.acquireMemory(sizeInBytes.toLong) + } + + /** + * Matches a plan whose single partition should be small enough to build a hash table. + * + * Note: this assume that the number of partition is fixed, requires additional work if it's + * dynamic. + */ + private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { + plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions + } + + /** + * Returns whether plan a is much smaller (3X) than plan b. + * + * The cost to build hash map is higher than sorting, we should only build hash map on a table + * that is much smaller than other one. Since we does not have the statistic for number of rows, + * use the size of bytes here as estimation. + */ + private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { + a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes + } + + private def canBuildRight(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true + case _ => false + } + + private def canBuildLeft(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | RightOuter => true + case _ => false + } + + private def broadcastSide( + canBuildLeft: Boolean, + canBuildRight: Boolean, + left: LogicalPlan, + right: LogicalPlan): BuildSide = { + + def smallerSide = + if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft + + if (canBuildRight && canBuildLeft) { + // Broadcast smaller side base on its estimated physical size + // if both sides have broadcast hint + smallerSide + } else if (canBuildRight) { + BuildRight + } else if (canBuildLeft) { + BuildLeft + } else { + // for the last default broadcast nested loop join + smallerSide + } + } + + private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan, hint: JoinHint) + : Boolean = { +// val buildLeft = canBuildLeft(joinType) && logical.BROADCAST.equals(hint.leftHint.get.strategy.get) +// val buildRight = canBuildRight(joinType) && logical.BROADCAST.equals(hint.rightHint.get.strategy.get) +// buildLeft || buildRight + false + } + + private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan, hint: JoinHint) + : BuildSide = { +// val buildLeft = canBuildLeft(joinType) && logical.BROADCAST.equals(hint.leftHint.get.strategy.get) +// val buildRight = canBuildRight(joinType) && logical.BROADCAST.equals(hint.rightHint.get.strategy.get) Review comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
