This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ccd40a905a [GLUTEN-11550][CORE][UT] Fix GlutenSingleJoinSuite for 
Spark 4.0+
ccd40a905a is described below

commit ccd40a905a963cad170dd0504421faab77ab1986
Author: Chang Chen <[email protected]>
AuthorDate: Fri Feb 6 23:15:37 2026 +0800

    [GLUTEN-11550][CORE][UT] Fix GlutenSingleJoinSuite for Spark 4.0+
    
    ## What changes were proposed in this pull request?
    
    Add support for LeftSingle join type to enable proper fallback to Spark
    execution in Spark 4.0+.
    
    ## Why are the changes needed?
    
    LeftSingle is a new join type introduced in Spark 4.0 for scalar subquery
    correlation. When Gluten encounters this join type, it should gracefully
    fall back to Spark execution since Substrait does not support it yet.
    
    Before this fix, the test `GlutenSingleJoinSuite` failed with:
    ```
    IllegalArgumentException: ShuffledHashJoinExecTransformer not take
    LeftSingle as the JoinType
    ```
    
    The root cause was that exceptions were thrown during plan transformation
    (when accessing `output`/`schema` properties) before validation could
    detect the unsupported join type and trigger fallback.
    
    ## How was this patch tested?
    
    - All 36 tests in GlutenSingleJoinSuite pass for both Spark 4.0 and 4.1
    - Verified fallback works correctly with log message:
      "Validation failed for plan: ShuffledHashJoin, due to:
       - Unsupported join type of LeftSingle for substrait: UNRECOGNIZED"
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 .../scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala | 3 +++
 .../scala/org/apache/gluten/execution/JoinExecTransformer.scala  | 9 +++++++++
 .../src/main/scala/org/apache/gluten/execution/JoinUtils.scala   | 5 +++++
 .../apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala | 4 ++--
 .../apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala | 4 ++--
 .../src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala  | 7 +++++++
 .../scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala | 5 +++++
 .../scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala | 5 +++++
 8 files changed, 38 insertions(+), 4 deletions(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 8b32ebce2f..671a29709e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.ValidationResult
 import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
@@ -91,6 +92,8 @@ trait BackendSettingsApi {
 
   def supportHashBuildJoinTypeOnRight: JoinType => Boolean = {
     case _: InnerLike | LeftOuter | FullOuter | LeftSemi | LeftAnti | _: 
ExistenceJoin => true
+    // LeftSingle is a Spark 4.0+ join type with same semantics as LeftOuter 
for build side.
+    case leftSingle if 
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) => true
     case _ => false
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index 4634d7c1fb..980843f05c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -63,6 +63,9 @@ trait ColumnarShuffledJoin extends BaseJoinExec {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
     case LeftOuter => left.outputPartitioning
+    // LeftSingle (Spark 4.0+) has same partitioning as LeftOuter
+    case leftSingle if 
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+      left.outputPartitioning
     case RightOuter => right.outputPartitioning
     case FullOuter => 
UnknownPartitioning(left.outputPartitioning.numPartitions)
     case LeftExistence(_) => left.outputPartitioning
@@ -157,6 +160,9 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with 
TransformSupport {
         case _: InnerLike => expandPartitioning(right.outputPartitioning)
         case RightOuter => right.outputPartitioning
         case LeftOuter => left.outputPartitioning
+        // LeftSingle (Spark 4.0+) - same as LeftOuter
+        case leftSingle if 
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+          left.outputPartitioning
         case FullOuter => 
UnknownPartitioning(left.outputPartitioning.numPartitions)
         case x =>
           throw new IllegalArgumentException(
@@ -166,6 +172,9 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with 
TransformSupport {
       joinType match {
         case _: InnerLike => expandPartitioning(left.outputPartitioning)
         case LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => 
left.outputPartitioning
+        // LeftSingle (Spark 4.0+) - same as LeftOuter
+        case leftSingle if 
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+          left.outputPartitioning
         case RightOuter => right.outputPartitioning
         case FullOuter => 
UnknownPartitioning(right.outputPartitioning.numPartitions)
         case x =>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index e6bc3484bc..a7a31cf471 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.expression.{AttributeReferenceTransformer, 
ExpressionConverter}
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode}
 import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, 
ExtensionBuilder}
@@ -149,6 +150,10 @@ object JoinUtils {
       case LeftExistence(_) =>
         // LeftSemi | LeftAnti | ExistenceJoin.
         (leftOutput, Nil)
+      // LeftSingle is a Spark 4.0+ join type with same output schema as 
LeftOuter.
+      // The join will fallback to Spark execution because substraitJoinType 
returns UNRECOGNIZED.
+      case leftSingle if 
SparkShimLoader.getSparkShims.isLeftSingleJoinType(leftSingle) =>
+        (leftOutput, rightOutput.map(_.withNullability(true)))
       case x =>
         val joinClass = 
Option(callerClassName).getOrElse(this.getClass.getSimpleName)
         throw new IllegalArgumentException(s"$joinClass not take $x as the 
JoinType")
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
index 876088a14b..148f5f7854 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
@@ -16,6 +16,6 @@
  */
 package org.apache.spark.sql.execution.joins
 
-import org.apache.spark.sql.GlutenTestsCommonTrait
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
-class GlutenSingleJoinSuite extends SingleJoinSuite with 
GlutenTestsCommonTrait {}
+class GlutenSingleJoinSuite extends SingleJoinSuite with 
GlutenSQLTestsBaseTrait {}
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
index 876088a14b..148f5f7854 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/joins/GlutenSingleJoinSuite.scala
@@ -16,6 +16,6 @@
  */
 package org.apache.spark.sql.execution.joins
 
-import org.apache.spark.sql.GlutenTestsCommonTrait
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
-class GlutenSingleJoinSuite extends SingleJoinSuite with 
GlutenTestsCommonTrait {}
+class GlutenSingleJoinSuite extends SingleJoinSuite with 
GlutenSQLTestsBaseTrait {}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index c2542f0368..1e03b7921a 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression, 
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, 
RaiseError, UnBase64}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning}
@@ -368,4 +369,10 @@ trait SparkShims {
       plan: LogicalPlan): SparkPlan
 
   def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean
+
+  /**
+   * Checks if the given JoinType is LeftSingle. LeftSingle is a Spark 4.0+ 
join type, semantically
+   * similar to LeftOuter. Default implementation returns false for Spark 3.x 
compatibility.
+   */
+  def isLeftSingleJoinType(joinType: JoinType): Boolean = false
 }
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index f329a18cfc..5d2d3258a7 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSingle}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
@@ -775,4 +776,8 @@ class Spark40Shims extends SparkShims {
   override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
     p.isFinalPlan
   }
+
+  override def isLeftSingleJoinType(joinType: JoinType): Boolean = {
+    joinType == LeftSingle
+  }
 }
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index f7ae54cc29..bb3a7d7248 100644
--- 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSingle}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
@@ -774,4 +775,8 @@ class Spark41Shims extends SparkShims {
   override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
     p.isFinalPlan
   }
+
+  override def isLeftSingleJoinType(joinType: JoinType): Boolean = {
+    joinType == LeftSingle
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to