This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 971481f419 [GLUTEN-8964][VL] Support BNLJ full outer join without 
condition (#8965)
971481f419 is described below

commit 971481f419a4cb69809795075ceaccde222e9bf5
Author: WangGuangxin <[email protected]>
AuthorDate: Wed Mar 19 01:06:30 2025 -0700

    [GLUTEN-8964][VL] Support BNLJ full outer join without condition (#8965)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala     |  4 ++--
 .../apache/gluten/execution/MiscOperatorSuite.scala | 20 ++++++++++++++++++++
 cpp/velox/substrait/SubstraitToVeloxPlan.cc         |  3 +++
 .../substrait/SubstraitToVeloxPlanValidator.cc      |  7 +++++++
 .../gluten/backendsapi/BackendSettingsApi.scala     |  4 ++--
 .../BroadcastNestedLoopJoinExecTransformer.scala    |  8 ++++++++
 .../execution/joins/GlutenBroadcastJoinSuite.scala  | 21 +++++++++++++++------
 .../spark/sql/gluten/GlutenFallbackSuite.scala      | 10 +---------
 .../spark/sql/gluten/GlutenFallbackSuite.scala      |  5 +----
 .../spark/sql/gluten/GlutenFallbackSuite.scala      |  5 +----
 10 files changed, 60 insertions(+), 27 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 0d13a4e9a6..9e3837b062 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -556,8 +556,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportCartesianProductExec(): Boolean = true
 
-  override def supportBroadcastNestedLoopJoinExec(): Boolean = true
-
   override def supportSampleExec(): Boolean = true
 
   override def supportColumnarArrowUdf(): Boolean = true
@@ -566,4 +564,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportCollectLimitExec(): Boolean = true
 
+  override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
+
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 0f1f9408d0..00e680721d 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -1990,4 +1991,23 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
       checkGlutenOperatorMatch[HashAggregateExecTransformer]
     }
   }
+
+  test("FullOuter in BroadcastNestLoopJoin") {
+    withTable("t1", "t2") {
+      spark.range(10).write.format("parquet").saveAsTable("t1")
+      spark.range(10).write.format("parquet").saveAsTable("t2")
+
+      // with join condition should fallback.
+      withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
+        runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < 
t2.id") {
+          checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
+        }
+
+        // without join condition should offload to gluten operator.
+        runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
+          checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
+        }
+      }
+    }
+  }
 }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 4e0e36cefb..0ef2cd6e03 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -343,6 +343,9 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
     case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_LEFT:
       joinType = core::JoinType::kLeft;
       break;
+    case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
+      joinType = core::JoinType::kFull;
+      break;
     default:
       VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
   }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 3de256ff31..21e43a7c7b 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1050,6 +1050,13 @@ bool SubstraitToVeloxPlanValidator::validate(const 
::substrait::CrossRel& crossR
     case ::substrait::CrossRel_JoinType_JOIN_TYPE_INNER:
     case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
       break;
+    case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
+      if (crossRel.has_expression()) {
+        LOG_VALIDATION_MSG("Full outer join type with condition is not 
supported in CrossRel");
+        return false;
+      } else {
+        break;
+      }
     default:
       LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
       return false;
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 84a4d04bfa..c8d72c0af6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -145,8 +145,6 @@ trait BackendSettingsApi {
 
   def supportCartesianProductExecWithCondition(): Boolean = true
 
-  def supportBroadcastNestedLoopJoinExec(): Boolean = true
-
   def supportSampleExec(): Boolean = false
 
   def supportColumnarArrowUdf(): Boolean = false
@@ -155,4 +153,6 @@ trait BackendSettingsApi {
 
   def supportCollectLimitExec(): Boolean = false
 
+  def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
+
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 9f12cdefee..5f3d2889ee 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -169,6 +169,14 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
   def validateJoinTypeAndBuildSide(): ValidationResult = {
     val result = joinType match {
       case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
+      case FullOuter
+          if 
BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
+        if (condition.isEmpty) {
+          ValidationResult.succeeded
+        } else {
+          ValidationResult.failed(
+            s"FullOuter join with join condition is not supported with 
BroadcastNestedLoopJoin")
+        }
       case _ =>
         ValidationResult.failed(s"$joinType join is not supported with 
BroadcastNestedLoopJoin")
     }
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
index 7d83405eba..9f82301258 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
@@ -142,9 +142,12 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
         // INNER JOIN && t1Size < t2Size => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", 
blt, BuildLeft)
         // FULL JOIN && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN 
t2", bl, BuildLeft)
+        assertJoinBuildSide(
+          "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < 
t2.key",
+          bl,
+          BuildLeft)
         // FULL OUTER && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < 
t2.key", bl, BuildLeft)
         // LEFT JOIN => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN 
t2", blt, BuildRight)
         // RIGHT JOIN => BuildLeft
@@ -156,10 +159,13 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
         // INNER JOIN && broadcast(t2) => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", 
blt, BuildRight)
         // FULL OUTER && broadcast(t1) => BuildLeft
-        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER 
JOIN t2", bl, BuildLeft)
+        assertJoinBuildSide(
+          "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key",
+          bl,
+          BuildLeft)
         // FULL OUTER && broadcast(t2) => BuildRight
         assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
+          "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key",
           bl,
           BuildRight)
         // LEFT JOIN && broadcast(t1) => BuildLeft
@@ -193,8 +199,11 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
       /* ######## test cases for non-equal join ######### */
       withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
         // For full outer join, prefer to broadcast the smaller side.
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
-        assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, 
BuildRight)
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key", bl, BuildLeft)
+        assertJoinBuildSide(
+          "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
+          bl,
+          BuildRight)
 
         // For inner join, prefer to broadcast the smaller side, if 
broadcast-able.
         withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 
1).toString()) {
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 4b67a84495..4fd7471648 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenBuildInfo
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.events.GlutenPlanFallbackEvent
 import org.apache.gluten.execution.FileSourceScanExecTransformer
-import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
@@ -116,14 +115,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
 
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
-      if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 1)
-        assert(
-          execution.get.fallbackNodeToReason.head._2
-            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
-      } else {
-        assert(execution.get.numFallbackNodes == 0)
-      }
+      execution.get.numFallbackNodes == 0
     }
 
     // [GLUTEN-4119] Skip add ReusedExchange to fallback node
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 47b70c528c..2afb18120d 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -118,10 +118,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
       if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 1)
-        assert(
-          execution.get.fallbackNodeToReason.head._2
-            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
+        assert(execution.get.numFallbackNodes == 0)
       } else {
         assert(execution.get.numFallbackNodes == 2)
       }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 47b70c528c..2afb18120d 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -118,10 +118,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
       if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 1)
-        assert(
-          execution.get.fallbackNodeToReason.head._2
-            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
+        assert(execution.get.numFallbackNodes == 0)
       } else {
         assert(execution.get.numFallbackNodes == 2)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to