This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 971481f419 [GLUTEN-8964][VL] Support BNLJ full outer join without
condition (#8965)
971481f419 is described below
commit 971481f419a4cb69809795075ceaccde222e9bf5
Author: WangGuangxin <[email protected]>
AuthorDate: Wed Mar 19 01:06:30 2025 -0700
[GLUTEN-8964][VL] Support BNLJ full outer join without condition (#8965)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 4 ++--
.../apache/gluten/execution/MiscOperatorSuite.scala | 20 ++++++++++++++++++++
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 3 +++
.../substrait/SubstraitToVeloxPlanValidator.cc | 7 +++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 4 ++--
.../BroadcastNestedLoopJoinExecTransformer.scala | 8 ++++++++
.../execution/joins/GlutenBroadcastJoinSuite.scala | 21 +++++++++++++++------
.../spark/sql/gluten/GlutenFallbackSuite.scala | 10 +---------
.../spark/sql/gluten/GlutenFallbackSuite.scala | 5 +----
.../spark/sql/gluten/GlutenFallbackSuite.scala | 5 +----
10 files changed, 60 insertions(+), 27 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 0d13a4e9a6..9e3837b062 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -556,8 +556,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportCartesianProductExec(): Boolean = true
- override def supportBroadcastNestedLoopJoinExec(): Boolean = true
-
override def supportSampleExec(): Boolean = true
override def supportColumnarArrowUdf(): Boolean = true
@@ -566,4 +564,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportCollectLimitExec(): Boolean = true
+ override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
+
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 0f1f9408d0..00e680721d 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -1990,4 +1991,23 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
checkGlutenOperatorMatch[HashAggregateExecTransformer]
}
}
+
+ test("FullOuter in BroadcastNestLoopJoin") {
+ withTable("t1", "t2") {
+ spark.range(10).write.format("parquet").saveAsTable("t1")
+ spark.range(10).write.format("parquet").saveAsTable("t2")
+
+ // with join condition should fallback.
+ withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
+ runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id <
t2.id") {
+ checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
+ }
+
+ // without join condition should offload to gluten operator.
+ runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
+ checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
+ }
+ }
+ }
+ }
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 4e0e36cefb..0ef2cd6e03 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -343,6 +343,9 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_LEFT:
joinType = core::JoinType::kLeft;
break;
+ case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
+ joinType = core::JoinType::kFull;
+ break;
default:
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 3de256ff31..21e43a7c7b 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1050,6 +1050,13 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::CrossRel& crossR
case ::substrait::CrossRel_JoinType_JOIN_TYPE_INNER:
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
break;
+ case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
+ if (crossRel.has_expression()) {
+ LOG_VALIDATION_MSG("Full outer join type with condition is not
supported in CrossRel");
+ return false;
+ } else {
+ break;
+ }
default:
LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
return false;
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 84a4d04bfa..c8d72c0af6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -145,8 +145,6 @@ trait BackendSettingsApi {
def supportCartesianProductExecWithCondition(): Boolean = true
- def supportBroadcastNestedLoopJoinExec(): Boolean = true
-
def supportSampleExec(): Boolean = false
def supportColumnarArrowUdf(): Boolean = false
@@ -155,4 +153,6 @@ trait BackendSettingsApi {
def supportCollectLimitExec(): Boolean = false
+ def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
+
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 9f12cdefee..5f3d2889ee 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -169,6 +169,14 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
def validateJoinTypeAndBuildSide(): ValidationResult = {
val result = joinType match {
case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
+ case FullOuter
+ if
BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
+ if (condition.isEmpty) {
+ ValidationResult.succeeded
+ } else {
+ ValidationResult.failed(
+ s"FullOuter join with join condition is not supported with
BroadcastNestedLoopJoin")
+ }
case _ =>
ValidationResult.failed(s"$joinType join is not supported with
BroadcastNestedLoopJoin")
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
index 7d83405eba..9f82301258 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
@@ -142,9 +142,12 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite
with GlutenTestsCommon
// INNER JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2",
blt, BuildLeft)
// FULL JOIN && t1Size < t2Size => BuildLeft
- assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN
t2", bl, BuildLeft)
+ assertJoinBuildSide(
+ "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key <
t2.key",
+ bl,
+ BuildLeft)
// FULL OUTER && t1Size < t2Size => BuildLeft
- assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl,
BuildLeft)
+ assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key <
t2.key", bl, BuildLeft)
// LEFT JOIN => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN
t2", blt, BuildRight)
// RIGHT JOIN => BuildLeft
@@ -156,10 +159,13 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite
with GlutenTestsCommon
// INNER JOIN && broadcast(t2) => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2",
blt, BuildRight)
// FULL OUTER && broadcast(t1) => BuildLeft
- assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER
JOIN t2", bl, BuildLeft)
+ assertJoinBuildSide(
+ "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key <
t2.key",
+ bl,
+ BuildLeft)
// FULL OUTER && broadcast(t2) => BuildRight
assertJoinBuildSide(
- "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
+ "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key <
t2.key",
bl,
BuildRight)
// LEFT JOIN && broadcast(t1) => BuildLeft
@@ -193,8 +199,11 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite
with GlutenTestsCommon
/* ######## test cases for non-equal join ######### */
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
// For full outer join, prefer to broadcast the smaller side.
- assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl,
BuildLeft)
- assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl,
BuildRight)
+ assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key <
t2.key", bl, BuildLeft)
+ assertJoinBuildSide(
+ "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
+ bl,
+ BuildRight)
// For inner join, prefer to broadcast the smaller side, if
broadcast-able.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size +
1).toString()) {
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 4b67a84495..4fd7471648 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenBuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.execution.FileSourceScanExecTransformer
-import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
@@ -116,14 +115,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
- if (BackendTestUtils.isVeloxBackendLoaded()) {
- assert(execution.get.numFallbackNodes == 1)
- assert(
- execution.get.fallbackNodeToReason.head._2
- .contains("FullOuter join is not supported with
BroadcastNestedLoopJoin"))
- } else {
- assert(execution.get.numFallbackNodes == 0)
- }
+ execution.get.numFallbackNodes == 0
}
// [GLUTEN-4119] Skip add ReusedExchange to fallback node
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 47b70c528c..2afb18120d 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -118,10 +118,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
if (BackendTestUtils.isVeloxBackendLoaded()) {
- assert(execution.get.numFallbackNodes == 1)
- assert(
- execution.get.fallbackNodeToReason.head._2
- .contains("FullOuter join is not supported with
BroadcastNestedLoopJoin"))
+ assert(execution.get.numFallbackNodes == 0)
} else {
assert(execution.get.numFallbackNodes == 2)
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 47b70c528c..2afb18120d 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -118,10 +118,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
if (BackendTestUtils.isVeloxBackendLoaded()) {
- assert(execution.get.numFallbackNodes == 1)
- assert(
- execution.get.fallbackNodeToReason.head._2
- .contains("FullOuter join is not supported with
BroadcastNestedLoopJoin"))
+ assert(execution.get.numFallbackNodes == 0)
} else {
assert(execution.get.numFallbackNodes == 2)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]