This is an automated email from the ASF dual-hosted git repository.

mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 21c0e43483 [VL] Remove support for BNLJ full outer join without 
condition (#11021)
21c0e43483 is described below

commit 21c0e43483dd3de0dd88a967718221321dda1723
Author: Mingliang Zhu <[email protected]>
AuthorDate: Wed Nov 5 09:00:29 2025 +0800

    [VL] Remove support for BNLJ full outer join without condition (#11021)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala     |  2 --
 .../apache/gluten/execution/MiscOperatorSuite.scala | 20 --------------------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc         |  3 ---
 .../substrait/SubstraitToVeloxPlanValidator.cc      |  7 -------
 .../gluten/backendsapi/BackendSettingsApi.scala     |  2 --
 .../BroadcastNestedLoopJoinExecTransformer.scala    | 10 +---------
 .../execution/joins/GlutenBroadcastJoinSuite.scala  | 21 ++++++---------------
 .../spark/sql/gluten/GlutenFallbackSuite.scala      | 10 +++++++++-
 .../spark/sql/gluten/GlutenFallbackSuite.scala      |  5 ++++-
 .../spark/sql/gluten/GlutenFallbackSuite.scala      |  5 ++++-
 10 files changed, 24 insertions(+), 61 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 3da306e1e9..e97747b842 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -568,8 +568,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
 
-  override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
-
   override def supportIcebergEqualityDeleteRead(): Boolean = false
 
   override def reorderColumnsForPartitionWrite(): Boolean = true
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index d8b3929ded..b6578c0ea5 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEShuffleReadExec, ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -2065,25 +2064,6 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     }
   }
 
-  test("FullOuter in BroadcastNestLoopJoin") {
-    withTable("t1", "t2") {
-      spark.range(10).write.format("parquet").saveAsTable("t1")
-      spark.range(10).write.format("parquet").saveAsTable("t2")
-
-      // with join condition should fallback.
-      withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
-        runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < 
t2.id") {
-          checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
-        }
-
-        // without join condition should offload to gluten operator.
-        runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
-          checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
-        }
-      }
-    }
-  }
-
   test("test get_struct_field with scalar function as input") {
     withSQLConf("spark.sql.json.enablePartialResults" -> "true") {
       withTable("t") {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 5b76c22d8c..fc58302c14 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -387,9 +387,6 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
         VELOX_NYI("Unsupported Join type: {}", 
std::to_string(crossRel.type()));
       }
       break;
-    case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
-      joinType = core::JoinType::kFull;
-      break;
     default:
       VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
   }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 1d953cfd66..ad6c0e15a8 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const 
::substrait::CrossRel& crossR
     case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
     case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI:
       break;
-    case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
-      if (crossRel.has_expression()) {
-        LOG_VALIDATION_MSG("Full outer join type with condition is not 
supported in CrossRel");
-        return false;
-      } else {
-        break;
-      }
     default:
       LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
       return false;
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 7c84218681..a1647b2f30 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -149,8 +149,6 @@ trait BackendSettingsApi {
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
 
-  def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
-
   def supportIcebergEqualityDeleteRead(): Boolean = true
 
   def reorderColumnsForPartitionWrite(): Boolean = false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 0f8e25e2f0..282cf8581e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
-import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, 
InnerLike, JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, 
JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
 import org.apache.spark.sql.execution.joins.BaseJoinExec
@@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
   def validateJoinTypeAndBuildSide(): ValidationResult = {
     val result = joinType match {
       case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
-      case FullOuter
-          if 
BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
-        if (condition.isEmpty) {
-          ValidationResult.succeeded
-        } else {
-          ValidationResult.failed(
-            s"FullOuter join with join condition is not supported with 
BroadcastNestedLoopJoin")
-        }
       case ExistenceJoin(_) =>
         ValidationResult.succeeded
       case _ =>
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
index ce380aebfa..2622e7d599 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
@@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
         // INNER JOIN && t1Size < t2Size => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", 
blt, BuildLeft)
         // FULL JOIN && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < 
t2.key",
-          bl,
-          BuildLeft)
+        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN 
t2", bl, BuildLeft)
         // FULL OUTER && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < 
t2.key", bl, BuildLeft)
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
         // LEFT JOIN => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN 
t2", blt, BuildRight)
         // RIGHT JOIN => BuildLeft
@@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
         // INNER JOIN && broadcast(t2) => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", 
blt, BuildRight)
         // FULL OUTER && broadcast(t1) => BuildLeft
-        assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key",
-          bl,
-          BuildLeft)
+        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER 
JOIN t2", bl, BuildLeft)
         // FULL OUTER && broadcast(t2) => BuildRight
         assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key",
+          "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
           bl,
           BuildRight)
         // LEFT JOIN && broadcast(t1) => BuildLeft
@@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
       /* ######## test cases for non-equal join ######### */
       withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
         // For full outer join, prefer to broadcast the smaller side.
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key", bl, BuildLeft)
-        assertJoinBuildSide(
-          "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
-          bl,
-          BuildRight)
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
+        assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, 
BuildRight)
 
         // For inner join, prefer to broadcast the smaller side, if 
broadcast-able.
         withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 
1).toString()) {
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index c8a8ac85cd..7b010688ed 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.events.GlutenPlanFallbackEvent
 import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -118,7 +119,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
 
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
-      execution.get.numFallbackNodes == 0
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        assert(execution.get.numFallbackNodes == 1)
+        assert(
+          execution.get.fallbackNodeToReason.head._2
+            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
+      } else {
+        assert(execution.get.numFallbackNodes == 0)
+      }
     }
 
     // [GLUTEN-4119] Skip add ReusedExchange to fallback node
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 058a63a67d..8edcef1c08 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
       if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 0)
+        assert(execution.get.numFallbackNodes == 1)
+        assert(
+          execution.get.fallbackNodeToReason.head._2
+            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
       } else {
         assert(execution.get.numFallbackNodes == 2)
       }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 058a63a67d..8edcef1c08 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
       if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 0)
+        assert(execution.get.numFallbackNodes == 1)
+        assert(
+          execution.get.fallbackNodeToReason.head._2
+            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
       } else {
         assert(execution.get.numFallbackNodes == 2)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to