This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ccd40a905a [GLUTEN-11550][CORE][UT] Fix GlutenSingleJoinSuite for
Spark 4.0+
ccd40a905a is described below
commit ccd40a905a963cad170dd0504421faab77ab1986
Author: Chang Chen <[email protected]>
AuthorDate: Fri Feb 6 23:15:37 2026 +0800
[GLUTEN-11550][CORE][UT] Fix GlutenSingleJoinSuite for Spark 4.0+
## What changes were proposed in this pull request?
Add support for LeftSingle join type to enable proper fallback to Spark
execution in Spark 4.0+.
## Why are the changes needed?
LeftSingle is a new join type introduced in Spark 4.0 for scalar subquery
correlation. When Gluten encounters this join type, it should gracefully
fall back to Spark execution since Substrait does not support it yet.
Before this fix, the test `GlutenSingleJoinSuite` failed with:
```
IllegalArgumentException: ShuffledHashJoinExecTransformer not take
LeftSingle as the JoinType
```
The root cause was that exceptions were thrown during plan transformation
(when accessing `output`/`schema` properties) before validation could
detect the unsupported join type and trigger fallback.
## How was this patch tested?
- All 36 tests in GlutenSingleJoinSuite pass for both Spark 4.0 and 4.1
- Verified fallback works correctly with log message:
"Validation failed for plan: ShuffledHashJoin, due to:
- Unsupported join type of LeftSingle for substrait: UNRECOGNIZED"
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
.../scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala | 3 +++
.../scala/org/apache/gluten/execution/JoinExecTransformer.scala | 9 +++++++++
.../src/main/scala/org/apache/gluten/execution/JoinUtils.scala | 5 +++++
.../apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala | 4 ++--
.../apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala | 4 ++--
.../src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala | 7 +++++++
.../scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala | 5 +++++
.../scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala | 5 +++++
8 files changed, 38 insertions(+), 4 deletions(-)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 8b32ebce2f..671a29709e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -91,6 +92,8 @@ trait BackendSettingsApi {
def supportHashBuildJoinTypeOnRight: JoinType => Boolean = {
case _: InnerLike | LeftOuter | FullOuter | LeftSemi | LeftAnti | _:
ExistenceJoin => true
+ // LeftSingle is a Spark 4.0+ join type with same semantics as LeftOuter
for build side.
+ case leftSingle if
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) => true
case _ => false
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index 4634d7c1fb..980843f05c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -63,6 +63,9 @@ trait ColumnarShuffledJoin extends BaseJoinExec {
case _: InnerLike =>
PartitioningCollection(Seq(left.outputPartitioning,
right.outputPartitioning))
case LeftOuter => left.outputPartitioning
+ // LeftSingle (Spark 4.0+) has same partitioning as LeftOuter
+ case leftSingle if
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+ left.outputPartitioning
case RightOuter => right.outputPartitioning
case FullOuter =>
UnknownPartitioning(left.outputPartitioning.numPartitions)
case LeftExistence(_) => left.outputPartitioning
@@ -157,6 +160,9 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with
TransformSupport {
case _: InnerLike => expandPartitioning(right.outputPartitioning)
case RightOuter => right.outputPartitioning
case LeftOuter => left.outputPartitioning
+ // LeftSingle (Spark 4.0+) - same as LeftOuter
+ case leftSingle if
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+ left.outputPartitioning
case FullOuter =>
UnknownPartitioning(left.outputPartitioning.numPartitions)
case x =>
throw new IllegalArgumentException(
@@ -166,6 +172,9 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with
TransformSupport {
joinType match {
case _: InnerLike => expandPartitioning(left.outputPartitioning)
case LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
left.outputPartitioning
+ // LeftSingle (Spark 4.0+) - same as LeftOuter
+ case leftSingle if
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+ left.outputPartitioning
case RightOuter => right.outputPartitioning
case FullOuter =>
UnknownPartitioning(right.outputPartitioning.numPartitions)
case x =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index e6bc3484bc..a7a31cf471 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.expression.{AttributeReferenceTransformer,
ExpressionConverter}
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode}
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode,
ExtensionBuilder}
@@ -149,6 +150,10 @@ object JoinUtils {
case LeftExistence(_) =>
// LeftSemi | LeftAnti | ExistenceJoin.
(leftOutput, Nil)
+ // LeftSingle is a Spark 4.0+ join type with same output schema as
LeftOuter.
+ // The join will fallback to Spark execution because substraitJoinType
returns UNRECOGNIZED.
+ case leftSingle if
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+ (leftOutput, rightOutput.map(_.withNullability(true)))
case x =>
val joinClass =
Option(callerClassName).getOrElse(this.getClass.getSimpleName)
throw new IllegalArgumentException(s"$joinClass not take $x as the
JoinType")
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
index 876088a14b..148f5f7854 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
@@ -16,6 +16,6 @@
*/
package org.apache.spark.sql.execution.joins
-import org.apache.spark.sql.GlutenTestsCommonTrait
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
-class GlutenSingleJoinSuite extends SingleJoinSuite with
GlutenTestsCommonTrait {}
+class GlutenSingleJoinSuite extends SingleJoinSuite with
GlutenSQLTestsBaseTrait {}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
index 876088a14b..148f5f7854 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
@@ -16,6 +16,6 @@
*/
package org.apache.spark.sql.execution.joins
-import org.apache.spark.sql.GlutenTestsCommonTrait
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
-class GlutenSingleJoinSuite extends SingleJoinSuite with
GlutenTestsCommonTrait {}
+class GlutenSingleJoinSuite extends SingleJoinSuite with
GlutenSQLTestsBaseTrait {}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index c2542f0368..1e03b7921a 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression,
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName,
RaiseError, UnBase64}
import
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{Distribution,
Partitioning}
@@ -368,4 +369,10 @@ trait SparkShims {
plan: LogicalPlan): SparkPlan
def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean
+
+ /**
+ * Checks if the given JoinType is LeftSingle. LeftSingle is a Spark 4.0+
join type, semantically
+ * similar to LeftOuter. Default implementation returns false for Spark 3.x
compatibility.
+ */
+ def isLeftSingleJoinType(joinType: JoinType): Boolean = false
}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index f329a18cfc..5d2d3258a7 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSingle}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
@@ -775,4 +776,8 @@ class Spark40Shims extends SparkShims {
override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
p.isFinalPlan
}
+
+ override def isLeftSingleJoinType(joinType: JoinType): Boolean = {
+ joinType == LeftSingle
+ }
}
diff --git
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index f7ae54cc29..bb3a7d7248 100644
---
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSingle}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
@@ -774,4 +775,8 @@ class Spark41Shims extends SparkShims {
override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
p.isFinalPlan
}
+
+ override def isLeftSingleJoinType(joinType: JoinType): Boolean = {
+ joinType == LeftSingle
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]