This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new d61c7738bd [GLUTEN-11058][VL][1.5] Backport #11021: remove support for 
BNLJ full outer join without condition (#11060)
d61c7738bd is described below

commit d61c7738bdfa3909f8b2f5708fbfa642d07628d2
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Nov 14 10:07:01 2025 +0800

    [GLUTEN-11058][VL][1.5] Backport #11021: remove support for BNLJ full outer 
join without condition (#11060)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala     |  2 --
 .../apache/gluten/execution/MiscOperatorSuite.scala | 20 --------------------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc         |  3 ---
 .../substrait/SubstraitToVeloxPlanValidator.cc      |  7 -------
 .../gluten/backendsapi/BackendSettingsApi.scala     |  2 --
 .../BroadcastNestedLoopJoinExecTransformer.scala    | 10 +---------
 .../execution/joins/GlutenBroadcastJoinSuite.scala  | 21 ++++++---------------
 .../spark/sql/gluten/GlutenFallbackSuite.scala      | 10 +++++++++-
 .../spark/sql/gluten/GlutenFallbackSuite.scala      |  5 ++++-
 .../spark/sql/gluten/GlutenFallbackSuite.scala      |  5 ++++-
 10 files changed, 24 insertions(+), 61 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9821efa66e..82684c75b8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -549,8 +549,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
 
-  override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
-
   override def supportIcebergEqualityDeleteRead(): Boolean = false
 
   override def reorderColumnsForPartitionWrite(): Boolean = true
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index e14dd1fcb5..982eff1252 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEShuffleReadExec, ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -2058,25 +2057,6 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     }
   }
 
-  test("FullOuter in BroadcastNestLoopJoin") {
-    withTable("t1", "t2") {
-      spark.range(10).write.format("parquet").saveAsTable("t1")
-      spark.range(10).write.format("parquet").saveAsTable("t2")
-
-      // with join condition should fallback.
-      withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
-        runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < 
t2.id") {
-          checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
-        }
-
-        // without join condition should offload to gluten operator.
-        runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
-          checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
-        }
-      }
-    }
-  }
-
   test("test get_struct_field with scalar function as input") {
     withSQLConf("spark.sql.json.enablePartialResults" -> "true") {
       withTable("t") {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index f6b5284f92..17ca9c1d1c 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -350,9 +350,6 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
         VELOX_NYI("Unsupported Join type: {}", 
std::to_string(crossRel.type()));
       }
       break;
-    case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
-      joinType = core::JoinType::kFull;
-      break;
     default:
       VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
   }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 1f0a5b3361..1b8df42bc6 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const 
::substrait::CrossRel& crossR
     case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
     case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI:
       break;
-    case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
-      if (crossRel.has_expression()) {
-        LOG_VALIDATION_MSG("Full outer join type with condition is not 
supported in CrossRel");
-        return false;
-      } else {
-        break;
-      }
     default:
       LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
       return false;
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index f4b9c46df1..4df7d45f42 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
 
-  def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
-
   def supportIcebergEqualityDeleteRead(): Boolean = true
 
   def reorderColumnsForPartitionWrite(): Boolean = false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 0f8e25e2f0..282cf8581e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
-import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, 
InnerLike, JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, 
JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
 import org.apache.spark.sql.execution.joins.BaseJoinExec
@@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
   def validateJoinTypeAndBuildSide(): ValidationResult = {
     val result = joinType match {
       case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
-      case FullOuter
-          if 
BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
-        if (condition.isEmpty) {
-          ValidationResult.succeeded
-        } else {
-          ValidationResult.failed(
-            s"FullOuter join with join condition is not supported with 
BroadcastNestedLoopJoin")
-        }
       case ExistenceJoin(_) =>
         ValidationResult.succeeded
       case _ =>
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
index ce380aebfa..2622e7d599 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
@@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
         // INNER JOIN && t1Size < t2Size => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", 
blt, BuildLeft)
         // FULL JOIN && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < 
t2.key",
-          bl,
-          BuildLeft)
+        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN 
t2", bl, BuildLeft)
         // FULL OUTER && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < 
t2.key", bl, BuildLeft)
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
         // LEFT JOIN => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN 
t2", blt, BuildRight)
         // RIGHT JOIN => BuildLeft
@@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
         // INNER JOIN && broadcast(t2) => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", 
blt, BuildRight)
         // FULL OUTER && broadcast(t1) => BuildLeft
-        assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key",
-          bl,
-          BuildLeft)
+        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER 
JOIN t2", bl, BuildLeft)
         // FULL OUTER && broadcast(t2) => BuildRight
         assertJoinBuildSide(
-          "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key",
+          "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
           bl,
           BuildRight)
         // LEFT JOIN && broadcast(t1) => BuildLeft
@@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite 
with GlutenTestsCommon
       /* ######## test cases for non-equal join ######### */
       withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
         // For full outer join, prefer to broadcast the smaller side.
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < 
t2.key", bl, BuildLeft)
-        assertJoinBuildSide(
-          "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
-          bl,
-          BuildRight)
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
+        assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, 
BuildRight)
 
         // For inner join, prefer to broadcast the smaller side, if 
broadcast-able.
         withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 
1).toString()) {
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index c8a8ac85cd..7b010688ed 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.events.GlutenPlanFallbackEvent
 import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -118,7 +119,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
 
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
-      execution.get.numFallbackNodes == 0
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        assert(execution.get.numFallbackNodes == 1)
+        assert(
+          execution.get.fallbackNodeToReason.head._2
+            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
+      } else {
+        assert(execution.get.numFallbackNodes == 0)
+      }
     }
 
     // [GLUTEN-4119] Skip add ReusedExchange to fallback node
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 058a63a67d..8edcef1c08 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
       if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 0)
+        assert(execution.get.numFallbackNodes == 1)
+        assert(
+          execution.get.fallbackNodeToReason.head._2
+            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
       } else {
         assert(execution.get.numFallbackNodes == 2)
       }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 058a63a67d..8edcef1c08 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
       val execution = glutenStore.execution(id)
       if (BackendTestUtils.isVeloxBackendLoaded()) {
-        assert(execution.get.numFallbackNodes == 0)
+        assert(execution.get.numFallbackNodes == 1)
+        assert(
+          execution.get.fallbackNodeToReason.head._2
+            .contains("FullOuter join is not supported with 
BroadcastNestedLoopJoin"))
       } else {
         assert(execution.get.numFallbackNodes == 2)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to