This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 39480ad7c refactor reordering join tables (#6854)
39480ad7c is described below

commit 39480ad7c08a0070cec2a9e8f0b2dc0112a9aaae
Author: lgbo <[email protected]>
AuthorDate: Fri Aug 16 15:47:43 2024 +0800

    refactor reordering join tables (#6854)
---
 .../gluten/vectorized/StorageJoinBuilder.java      |   5 +-
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  25 ++++
 .../clickhouse/CHSparkPlanExecApi.scala            |   4 +-
 .../CHBroadcastNestedLoopJoinExecTransformer.scala |   3 +-
 .../execution/CHHashJoinExecTransformer.scala      |  45 +++++--
 .../gluten/extension/ReorderJoinTablesRule.scala   | 149 ---------------------
 .../benchmarks/CHHashBuildBenchmark.scala          |   2 +-
 .../extension/columnar/OffloadSingleNode.scala     |   8 +-
 8 files changed, 72 insertions(+), 169 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
index ae7b89120..1c4c1302d 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.gluten.vectorized;
 
 import org.apache.gluten.execution.BroadCastHashJoinContext;
+import org.apache.gluten.execution.JoinTypeTransform;
 import org.apache.gluten.expression.ConverterUtils;
 import org.apache.gluten.expression.ConverterUtils$;
 import org.apache.gluten.substrait.type.TypeNode;
@@ -80,7 +81,9 @@ public class StorageJoinBuilder {
     if 
(broadCastContext.buildHashTableId().startsWith("BuiltBNLJBroadcastTable-")) {
       joinType = 
SubstraitUtil.toCrossRelSubstrait(broadCastContext.joinType()).ordinal();
     } else {
-      joinType = 
SubstraitUtil.toSubstrait(broadCastContext.joinType()).ordinal();
+      boolean buildRight = broadCastContext.buildRight();
+      joinType =
+          JoinTypeTransform.toSubstraitJoinType(broadCastContext.joinType(), 
buildRight).ordinal();
     }
 
     return nativeBuild(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 4677a28e6..9884a0c6e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
@@ -382,4 +383,28 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
 
   override def supportCartesianProductExec(): Boolean = true
 
+  override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
+    t =>
+      if (super.supportHashBuildJoinTypeOnLeft(t)) {
+        true
+      } else {
+        t match {
+          case LeftOuter => true
+          case _ => false
+        }
+      }
+  }
+
+  override def supportHashBuildJoinTypeOnRight: JoinType => Boolean = {
+    t =>
+      if (super.supportHashBuildJoinTypeOnRight(t)) {
+        true
+      } else {
+        t match {
+          case RightOuter => true
+          case _ => false
+        }
+      }
+  }
+
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 5a49d6ea3..8fdc2645a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.{BackendsApiManager, 
SparkPlanExecApi}
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression._
-import org.apache.gluten.extension.{CommonSubexpressionEliminateRule, 
CountDistinctWithoutExpand, FallbackBroadcastHashJoin, 
FallbackBroadcastHashJoinPrepQueryStage, ReorderJoinTablesRule, 
RewriteDateTimestampComparisonRule, RewriteSortMergeJoinToHashJoinRule, 
RewriteToDateExpresstionRule}
+import org.apache.gluten.extension.{CommonSubexpressionEliminateRule, 
CountDistinctWithoutExpand, FallbackBroadcastHashJoin, 
FallbackBroadcastHashJoinPrepQueryStage, RewriteDateTimestampComparisonRule, 
RewriteSortMergeJoinToHashJoinRule, RewriteToDateExpresstionRule}
 import org.apache.gluten.extension.columnar.AddFallbackTagRule
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
 import org.apache.gluten.extension.columnar.transition.Convention
@@ -606,7 +606,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
    * @return
    */
   override def genExtendedColumnarTransformRules(): List[SparkSession => 
Rule[SparkPlan]] =
-    List(spark => RewriteSortMergeJoinToHashJoinRule(spark), spark => 
ReorderJoinTablesRule(spark))
+    List(spark => RewriteSortMergeJoinToHashJoinRule(spark))
 
   override def genInjectPostHocResolutionRules(): List[SparkSession => 
Rule[LogicalPlan]] = {
     List()
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
index 3aab5a6eb..abd87468f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
@@ -22,7 +22,7 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc.GlutenDriverEndpoint
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.BuildSide
+import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftSemi}
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
@@ -68,6 +68,7 @@ case class CHBroadcastNestedLoopJoinExecTransformer(
       BroadCastHashJoinContext(
         Seq.empty,
         finalJoinType,
+        buildSide == BuildRight,
         false,
         joinType.isInstanceOf[ExistenceJoin],
         buildPlan.output,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
index adb824804..2dd45281e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
@@ -43,29 +43,39 @@ object JoinTypeTransform {
     }
   }
 
-  def toSubstraitType(joinType: JoinType, buildSide: BuildSide): 
JoinRel.JoinType = {
-    joinType match {
+  def toSubstraitJoinType(sparkJoin: JoinType, buildRight: Boolean): 
JoinRel.JoinType =
+    sparkJoin match {
       case _: InnerLike =>
         JoinRel.JoinType.JOIN_TYPE_INNER
       case FullOuter =>
         JoinRel.JoinType.JOIN_TYPE_OUTER
       case LeftOuter =>
-        JoinRel.JoinType.JOIN_TYPE_LEFT
-      case RightOuter if (buildSide == BuildLeft) =>
-        // The tables order will be reversed in HashJoinLikeExecTransformer
-        JoinRel.JoinType.JOIN_TYPE_LEFT
-      case RightOuter if (buildSide == BuildRight) =>
-        // This the case rewritten in ReorderJoinLeftRightRule
-        JoinRel.JoinType.JOIN_TYPE_RIGHT
+        if (!buildRight) {
+          JoinRel.JoinType.JOIN_TYPE_RIGHT
+        } else {
+          JoinRel.JoinType.JOIN_TYPE_LEFT
+        }
+      case RightOuter =>
+        if (!buildRight) {
+          JoinRel.JoinType.JOIN_TYPE_LEFT
+        } else {
+          JoinRel.JoinType.JOIN_TYPE_RIGHT
+        }
       case LeftSemi | ExistenceJoin(_) =>
+        if (!buildRight) {
+          throw new IllegalArgumentException("LeftSemi join should not switch 
children")
+        }
         JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
       case LeftAnti =>
+        if (!buildRight) {
+          throw new IllegalArgumentException("LeftAnti join should not switch 
children")
+        }
         JoinRel.JoinType.JOIN_TYPE_ANTI
       case _ =>
         // TODO: Support cross join with Cross Rel
         JoinRel.JoinType.UNRECOGNIZED
     }
-  }
+
 }
 
 case class CHShuffledHashJoinExecTransformer(
@@ -104,8 +114,6 @@ case class CHShuffledHashJoinExecTransformer(
     super.doValidateInternal()
   }
   private val finalJoinType = JoinTypeTransform.toNativeJoinType(joinType)
-  override protected lazy val substraitJoinType: JoinRel.JoinType =
-    JoinTypeTransform.toSubstraitType(joinType, buildSide)
 
   override def genJoinParameters(): Any = {
     val (isBHJ, isNullAwareAntiJoin, buildHashTableId): (Int, Int, String) = 
(0, 0, "")
@@ -162,6 +170,12 @@ case class CHShuffledHashJoinExecTransformer(
       .build()
     BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
   }
+
+  override protected lazy val substraitJoinType: JoinRel.JoinType = {
+    val res = JoinTypeTransform.toSubstraitJoinType(joinType, buildSide == 
BuildRight)
+    logDebug(s"Convert join type from: $joinType:$buildSide to $res 
$needSwitchChildren")
+    res
+  }
 }
 
 case class CHBroadcastBuildSideRDD(
@@ -179,6 +193,7 @@ case class CHBroadcastBuildSideRDD(
 case class BroadCastHashJoinContext(
     buildSideJoinKeys: Seq[Expression],
     joinType: JoinType,
+    buildRight: Boolean,
     hasMixedFiltCondition: Boolean,
     isExistenceJoin: Boolean,
     buildSideStructure: Seq[Attribute],
@@ -241,6 +256,7 @@ case class CHBroadcastHashJoinExecTransformer(
       BroadCastHashJoinContext(
         buildKeyExprs,
         finalJoinType,
+        buildSide == BuildRight,
         isMixedCondition(condition),
         joinType.isInstanceOf[ExistenceJoin],
         buildPlan.output,
@@ -268,6 +284,7 @@ case class CHBroadcastHashJoinExecTransformer(
   // We don't have left any join in substrait, so use left semi join instead.
   // and isExistenceJoin is set to true to indicate that it is an existence 
join.
   private val finalJoinType = JoinTypeTransform.toNativeJoinType(joinType)
-  override protected lazy val substraitJoinType: JoinRel.JoinType =
-    JoinTypeTransform.toSubstraitType(joinType, buildSide)
+  override protected lazy val substraitJoinType: JoinRel.JoinType = {
+    JoinTypeTransform.toSubstraitJoinType(joinType, buildSide == BuildRight)
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ReorderJoinTablesRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ReorderJoinTablesRule.scala
deleted file mode 100644
index 4cedaae25..000000000
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ReorderJoinTablesRule.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
-import org.apache.gluten.execution._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.optimizer._
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive._
-
-case class ReorderJoinTablesRule(session: SparkSession) extends 
Rule[SparkPlan] with Logging {
-  override def apply(plan: SparkPlan): SparkPlan = {
-    if (CHBackendSettings.enableReorderHashJoinTables) {
-      visitPlan(plan)
-    } else {
-      plan
-    }
-  }
-
-  private def visitPlan(plan: SparkPlan): SparkPlan = {
-    plan match {
-      case hashShuffle: ColumnarShuffleExchangeExec =>
-        hashShuffle.withNewChildren(hashShuffle.children.map(visitPlan))
-      case hashJoin: CHShuffledHashJoinExecTransformer =>
-        val newHashJoin = reorderHashJoin(hashJoin)
-        newHashJoin.withNewChildren(newHashJoin.children.map(visitPlan))
-      case _ =>
-        plan.withNewChildren(plan.children.map(visitPlan))
-    }
-  }
-
-  private def reorderHashJoin(hashJoin: CHShuffledHashJoinExecTransformer): 
SparkPlan = {
-    val leftQueryStageRow = childShuffleQueryStageRows(hashJoin.left)
-    val rightQueryStageRow = childShuffleQueryStageRows(hashJoin.right)
-    if (leftQueryStageRow == None || rightQueryStageRow == None) {
-      logError(s"Cannot reorder this hash join. Its children is not 
ShuffleQueryStageExec")
-      hashJoin
-    } else {
-      val threshold = CHBackendSettings.reorderHashJoinTablesThreshold
-      val isLeftLarger = leftQueryStageRow.get > rightQueryStageRow.get * 
threshold
-      val isRightLarger = leftQueryStageRow.get * threshold < 
rightQueryStageRow.get
-      hashJoin.joinType match {
-        case Inner =>
-          if (isRightLarger && hashJoin.buildSide == BuildRight) {
-            CHShuffledHashJoinExecTransformer(
-              hashJoin.rightKeys,
-              hashJoin.leftKeys,
-              hashJoin.joinType,
-              hashJoin.buildSide,
-              hashJoin.condition,
-              hashJoin.right,
-              hashJoin.left,
-              hashJoin.isSkewJoin)
-          } else if (isLeftLarger && hashJoin.buildSide == BuildLeft) {
-            CHShuffledHashJoinExecTransformer(
-              hashJoin.leftKeys,
-              hashJoin.rightKeys,
-              hashJoin.joinType,
-              BuildRight,
-              hashJoin.condition,
-              hashJoin.left,
-              hashJoin.right,
-              hashJoin.isSkewJoin)
-          } else {
-            hashJoin
-          }
-        case LeftOuter =>
-          // left outer + build right is the common case,other cases have not 
been covered by tests
-          // and don't reroder them.
-          if (isRightLarger && hashJoin.buildSide == BuildRight) {
-            CHShuffledHashJoinExecTransformer(
-              hashJoin.rightKeys,
-              hashJoin.leftKeys,
-              RightOuter,
-              BuildRight,
-              hashJoin.condition,
-              hashJoin.right,
-              hashJoin.left,
-              hashJoin.isSkewJoin)
-          } else {
-            hashJoin
-          }
-        case RightOuter =>
-          // right outer + build left is the common case,other cases have not 
been covered by tests
-          // and don't reroder them.
-          if (isLeftLarger && hashJoin.buildSide == BuildLeft) {
-            CHShuffledHashJoinExecTransformer(
-              hashJoin.leftKeys,
-              hashJoin.rightKeys,
-              RightOuter,
-              BuildRight,
-              hashJoin.condition,
-              hashJoin.left,
-              hashJoin.right,
-              hashJoin.isSkewJoin)
-          } else if (isRightLarger && hashJoin.buildSide == BuildLeft) {
-            CHShuffledHashJoinExecTransformer(
-              hashJoin.rightKeys,
-              hashJoin.leftKeys,
-              LeftOuter,
-              BuildRight,
-              hashJoin.condition,
-              hashJoin.right,
-              hashJoin.left,
-              hashJoin.isSkewJoin)
-          } else {
-            hashJoin
-          }
-        case _ => hashJoin
-      }
-    }
-  }
-
-  private def childShuffleQueryStageRows(plan: SparkPlan): Option[BigInt] = {
-    plan match {
-      case queryStage: ShuffleQueryStageExec =>
-        queryStage.getRuntimeStatistics.rowCount
-      case _: ColumnarBroadcastExchangeExec =>
-        None
-      case _: ColumnarShuffleExchangeExec =>
-        None
-      case _ =>
-        if (plan.children.length == 1) {
-          childShuffleQueryStageRows(plan.children.head)
-        } else {
-          None
-        }
-    }
-  }
-}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
index 141bf5eea..87c389a65 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala
@@ -104,7 +104,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark w
     (
       countsAndBytes.flatMap(_._2),
       countsAndBytes.map(_._1).sum,
-      BroadCastHashJoinContext(Seq(child.output.head), Inner, false, false, 
child.output, "")
+      BroadCastHashJoinContext(Seq(child.output.head), Inner, true, false, 
false, child.output, "")
     )
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index a8cc79128..70b85165c 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -205,7 +205,13 @@ object OffloadJoin {
         case Some(join: Join) =>
           val leftSize = join.left.stats.sizeInBytes
           val rightSize = join.right.stats.sizeInBytes
-          if (rightSize <= leftSize) BuildRight else BuildLeft
+          val leftRowCount = join.left.stats.rowCount
+          val rightRowCount = join.right.stats.rowCount
+          if (rightSize == leftSize && rightRowCount.isDefined && 
leftRowCount.isDefined) {
+            if (rightRowCount.get <= leftRowCount.get) BuildRight
+            else BuildLeft
+          } else if (rightSize <= leftSize) BuildRight
+          else BuildLeft
         // Only the ShuffledHashJoinExec generated directly in some spark 
tests is not link
         // logical plan, such as OuterJoinSuite.
         case _ => shj.buildSide


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to