This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new d61c7738bd [GLUTEN-11058][VL][1.5] Backport #11021: remove support for
BNLJ full outer join without condition (#11060)
d61c7738bd is described below
commit d61c7738bdfa3909f8b2f5708fbfa642d07628d2
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Nov 14 10:07:01 2025 +0800
[GLUTEN-11058][VL][1.5] Backport #11021: remove support for BNLJ full outer
join without condition (#11060)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 --
.../apache/gluten/execution/MiscOperatorSuite.scala | 20 --------------------
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 3 ---
.../substrait/SubstraitToVeloxPlanValidator.cc | 7 -------
.../gluten/backendsapi/BackendSettingsApi.scala | 2 --
.../BroadcastNestedLoopJoinExecTransformer.scala | 10 +---------
.../execution/joins/GlutenBroadcastJoinSuite.scala | 21 ++++++---------------
.../spark/sql/gluten/GlutenFallbackSuite.scala | 10 +++++++++-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 5 ++++-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 5 ++++-
10 files changed, 24 insertions(+), 61 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9821efa66e..82684c75b8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -549,8 +549,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def needPreComputeRangeFrameBoundary(): Boolean = true
- override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
-
override def supportIcebergEqualityDeleteRead(): Boolean = false
override def reorderColumnsForPartitionWrite(): Boolean = true
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index e14dd1fcb5..982eff1252 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEShuffleReadExec, ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -2058,25 +2057,6 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
- test("FullOuter in BroadcastNestLoopJoin") {
- withTable("t1", "t2") {
- spark.range(10).write.format("parquet").saveAsTable("t1")
- spark.range(10).write.format("parquet").saveAsTable("t2")
-
- // with join condition should fallback.
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
- runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id <
t2.id") {
- checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
- }
-
- // without join condition should offload to gluten operator.
- runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
- checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
- }
- }
- }
- }
-
test("test get_struct_field with scalar function as input") {
withSQLConf("spark.sql.json.enablePartialResults" -> "true") {
withTable("t") {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index f6b5284f92..17ca9c1d1c 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -350,9 +350,6 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
VELOX_NYI("Unsupported Join type: {}",
std::to_string(crossRel.type()));
}
break;
- case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
- joinType = core::JoinType::kFull;
- break;
default:
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 1f0a5b3361..1b8df42bc6 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::CrossRel& crossR
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI:
break;
- case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
- if (crossRel.has_expression()) {
- LOG_VALIDATION_MSG("Full outer join type with condition is not
supported in CrossRel");
- return false;
- } else {
- break;
- }
default:
LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
return false;
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index f4b9c46df1..4df7d45f42 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
def needPreComputeRangeFrameBoundary(): Boolean = false
- def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
-
def supportIcebergEqualityDeleteRead(): Boolean = true
def reorderColumnsForPartitionWrite(): Boolean = false
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 0f8e25e2f0..282cf8581e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
-import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter,
InnerLike, JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike,
JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
import org.apache.spark.sql.execution.joins.BaseJoinExec
@@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
def validateJoinTypeAndBuildSide(): ValidationResult = {
val result = joinType match {
case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
- case FullOuter
- if
BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
- if (condition.isEmpty) {
- ValidationResult.succeeded
- } else {
- ValidationResult.failed(
- s"FullOuter join with join condition is not supported with
BroadcastNestedLoopJoin")
- }
case ExistenceJoin(_) =>
ValidationResult.succeeded
case _ =>
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
index ce380aebfa..2622e7d599 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala
@@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite
with GlutenTestsCommon
// INNER JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2",
blt, BuildLeft)
// FULL JOIN && t1Size < t2Size => BuildLeft
- assertJoinBuildSide(
- "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key <
t2.key",
- bl,
- BuildLeft)
+ assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN
t2", bl, BuildLeft)
// FULL OUTER && t1Size < t2Size => BuildLeft
- assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key <
t2.key", bl, BuildLeft)
+ assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl,
BuildLeft)
// LEFT JOIN => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN
t2", blt, BuildRight)
// RIGHT JOIN => BuildLeft
@@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite
with GlutenTestsCommon
// INNER JOIN && broadcast(t2) => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2",
blt, BuildRight)
// FULL OUTER && broadcast(t1) => BuildLeft
- assertJoinBuildSide(
- "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key <
t2.key",
- bl,
- BuildLeft)
+ assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER
JOIN t2", bl, BuildLeft)
// FULL OUTER && broadcast(t2) => BuildRight
assertJoinBuildSide(
- "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key <
t2.key",
+ "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
bl,
BuildRight)
// LEFT JOIN && broadcast(t1) => BuildLeft
@@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite
with GlutenTestsCommon
/* ######## test cases for non-equal join ######### */
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
// For full outer join, prefer to broadcast the smaller side.
- assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key <
t2.key", bl, BuildLeft)
- assertJoinBuildSide(
- "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
- bl,
- BuildRight)
+ assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl,
BuildLeft)
+ assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl,
BuildRight)
// For inner join, prefer to broadcast the smaller side, if
broadcast-able.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size +
1).toString()) {
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index c8a8ac85cd..7b010688ed 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -118,7 +119,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
- execution.get.numFallbackNodes == 0
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ assert(execution.get.numFallbackNodes == 1)
+ assert(
+ execution.get.fallbackNodeToReason.head._2
+ .contains("FullOuter join is not supported with
BroadcastNestedLoopJoin"))
+ } else {
+ assert(execution.get.numFallbackNodes == 0)
+ }
}
// [GLUTEN-4119] Skip add ReusedExchange to fallback node
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 058a63a67d..8edcef1c08 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
if (BackendTestUtils.isVeloxBackendLoaded()) {
- assert(execution.get.numFallbackNodes == 0)
+ assert(execution.get.numFallbackNodes == 1)
+ assert(
+ execution.get.fallbackNodeToReason.head._2
+ .contains("FullOuter join is not supported with
BroadcastNestedLoopJoin"))
} else {
assert(execution.get.numFallbackNodes == 2)
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 058a63a67d..8edcef1c08 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
if (BackendTestUtils.isVeloxBackendLoaded()) {
- assert(execution.get.numFallbackNodes == 0)
+ assert(execution.get.numFallbackNodes == 1)
+ assert(
+ execution.get.fallbackNodeToReason.head._2
+ .contains("FullOuter join is not supported with
BroadcastNestedLoopJoin"))
} else {
assert(execution.get.numFallbackNodes == 2)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]