This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cd8fdbce052c [SPARK-53274][SQL] Support left and right join pushdown 
in JDBCScanBuilder
cd8fdbce052c is described below

commit cd8fdbce052cbae9f59389e65ea596dddc4d7190
Author: Petar Vasiljevic <petar.vasilje...@databricks.com>
AuthorDate: Fri Aug 15 22:19:17 2025 +0800

    [SPARK-53274][SQL] Support left and right join pushdown in JDBCScanBuilder
    
    Currently, only inner joins are supported for pushdown in JDBCScanBuilder.
    ### What changes were proposed in this pull request?
    With this PR, left and right joins are supported as well. If there is no 
condition in the join, these are not pushed down. This is the same behaviour we 
have with inner joins.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52017 from PetarVasiljevic-DB/support_left_right_joins.
    
    Authored-by: Petar Vasiljevic <petar.vasilje...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/connector/join/JoinType.java  |  2 +
 .../execution/datasources/DataSourceStrategy.scala |  4 +-
 .../datasources/v2/jdbc/JDBCScanBuilder.scala      |  2 +
 .../JDBCV2JoinPushdownIntegrationSuiteBase.scala   | 54 ++++++++++++++++++++--
 4 files changed, 57 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java
index 23ef609201eb..56fb0a51c243 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java
@@ -27,4 +27,6 @@ import org.apache.spark.annotation.Evolving;
 @Evolving
 public enum JoinType {
     INNER_JOIN,
+    LEFT_OUTER_JOIN,
+    RIGHT_OUTER_JOIN,
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 15b34457923f..2e47f08ac115 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -37,7 +37,7 @@ import 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
+import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, 
RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, 
InsertIntoStatement, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
@@ -513,6 +513,8 @@ object DataSourceStrategy
   def translateJoinType(joinType: JoinType): Option[V2JoinType] = {
     joinType match {
       case Inner => Some(V2JoinType.INNER_JOIN)
+      case LeftOuter => Some(V2JoinType.LEFT_OUTER_JOIN)
+      case RightOuter => Some(V2JoinType.RIGHT_OUTER_JOIN)
       case _ => None
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
index dd1ce5fe9fd2..b758ddd35e0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
@@ -187,6 +187,8 @@ case class JDBCScanBuilder(
 
     val joinTypeStringOption = joinType match {
       case JoinType.INNER_JOIN => Some("INNER JOIN")
+      case JoinType.LEFT_OUTER_JOIN => Some("LEFT JOIN")
+      case JoinType.RIGHT_OUTER_JOIN => Some("RIGHT JOIN")
       case _ => None
     }
     if (!joinTypeStringOption.isDefined) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
index 514c487cfae8..d0639f77d6da 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
@@ -561,39 +561,85 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     }
   }
 
-  test("Test left outer join should not be pushed down") {
+  test("Test left outer join with condition should be pushed down") {
     val sqlQuery =
       s"""
          |SELECT t1.id, t1.address, t2.surname
          |FROM $catalogAndNamespace.$casedJoinTableName1 t1
-         |LEFT JOIN $catalogAndNamespace.$casedJoinTableName2 t2 ON t1.id = 
t2.id
+         |LEFT JOIN $catalogAndNamespace.$casedJoinTableName2 t2
+         |ON t1.id = t2.id
          |""".stripMargin
 
     val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
       sql(sqlQuery).collect().toSeq
     }
 
+    assert(rows.nonEmpty)
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
+      checkJoinPushed(df)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test left outer join without condition - no pushdown") {
+    val sqlQuery =
+      s"""
+         |SELECT * FROM
+         |$catalogAndNamespace.$casedJoinTableName1 a
+         |LEFT JOIN
+         |$catalogAndNamespace.$casedJoinTableName2 b
+         |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
       checkJoinNotPushed(df)
       checkAnswer(df, rows)
     }
   }
 
-  test("Test right outer join should not be pushed down") {
+  test("Test right outer join with condition should be pushed down") {
     val sqlQuery =
       s"""
          |SELECT t1.id, t1.address, t2.surname
          |FROM $catalogAndNamespace.$casedJoinTableName1 t1
-         |RIGHT JOIN $catalogAndNamespace.$casedJoinTableName2 t2 ON t1.id = 
t2.id
+         |RIGHT JOIN $catalogAndNamespace.$casedJoinTableName2 t2
+         |ON t1.id = t2.id
          |""".stripMargin
 
     val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
       sql(sqlQuery).collect().toSeq
     }
 
+    assert(rows.nonEmpty)
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
+      checkJoinPushed(df)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test right outer join without condition - no pushdown") {
+    val sqlQuery =
+      s"""
+         |SELECT * FROM
+         |$catalogAndNamespace.$casedJoinTableName1 a
+         |RIGHT JOIN
+         |$catalogAndNamespace.$casedJoinTableName2 b
+         |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
       checkJoinNotPushed(df)
       checkAnswer(df, rows)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to