This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cd8fdbce052c [SPARK-53274][SQL] Support left and right join pushdown in JDBCScanBuilder cd8fdbce052c is described below commit cd8fdbce052cbae9f59389e65ea596dddc4d7190 Author: Petar Vasiljevic <petar.vasilje...@databricks.com> AuthorDate: Fri Aug 15 22:19:17 2025 +0800 [SPARK-53274][SQL] Support left and right join pushdown in JDBCScanBuilder Currently, only inner joins are supported for pushdown in JDBCScanBuilder. ### What changes were proposed in this pull request? With this PR, left and right joins are supported as well. If there is no condition in the join, these are not pushed down. This is the same behaviour we have with inner joins. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52017 from PetarVasiljevic-DB/support_left_right_joins. Authored-by: Petar Vasiljevic <petar.vasilje...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/connector/join/JoinType.java | 2 + .../execution/datasources/DataSourceStrategy.scala | 4 +- .../datasources/v2/jdbc/JDBCScanBuilder.scala | 2 + .../JDBCV2JoinPushdownIntegrationSuiteBase.scala | 54 ++++++++++++++++++++-- 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java index 23ef609201eb..56fb0a51c243 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinType.java @@ -27,4 +27,6 @@ import org.apache.spark.annotation.Evolving; @Evolving public enum JoinType { INNER_JOIN, + LEFT_OUTER_JOIN, + RIGHT_OUTER_JOIN, } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 15b34457923f..2e47f08ac115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} +import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 @@ -513,6 +513,8 @@ object DataSourceStrategy def translateJoinType(joinType: JoinType): Option[V2JoinType] = { joinType match { case Inner => Some(V2JoinType.INNER_JOIN) + case LeftOuter => Some(V2JoinType.LEFT_OUTER_JOIN) + case RightOuter => Some(V2JoinType.RIGHT_OUTER_JOIN) case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala index dd1ce5fe9fd2..b758ddd35e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala @@ -187,6 +187,8 @@ case class JDBCScanBuilder( val joinTypeStringOption = joinType match { case JoinType.INNER_JOIN => Some("INNER JOIN") + case JoinType.LEFT_OUTER_JOIN => Some("LEFT JOIN") + case JoinType.RIGHT_OUTER_JOIN => Some("RIGHT JOIN") case _ => None } if (!joinTypeStringOption.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala index 514c487cfae8..d0639f77d6da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala @@ -561,39 +561,85 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase } } - test("Test left outer join should not be pushed down") { + test("Test left outer join with condition should be pushed down") { val sqlQuery = s""" |SELECT t1.id, t1.address, t2.surname |FROM $catalogAndNamespace.$casedJoinTableName1 t1 - |LEFT JOIN $catalogAndNamespace.$casedJoinTableName2 t2 ON t1.id = t2.id + |LEFT JOIN $catalogAndNamespace.$casedJoinTableName2 t2 + |ON t1.id = t2.id |""".stripMargin val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { sql(sqlQuery).collect().toSeq } + assert(rows.nonEmpty) withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) + checkJoinPushed(df) + checkAnswer(df, rows) + } + } + + test("Test left outer join without condition - no pushdown") { + val sqlQuery = + s""" + |SELECT * FROM + |$catalogAndNamespace.$casedJoinTableName1 a + |LEFT JOIN + |$catalogAndNamespace.$casedJoinTableName2 b + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + checkJoinNotPushed(df) checkAnswer(df, rows) } } - test("Test right outer join should not be pushed down") { + test("Test right outer join with condition should be pushed down") { val sqlQuery = s""" |SELECT t1.id, t1.address, t2.surname |FROM $catalogAndNamespace.$casedJoinTableName1 t1 - |RIGHT JOIN $catalogAndNamespace.$casedJoinTableName2 t2 ON t1.id = t2.id + |RIGHT JOIN $catalogAndNamespace.$casedJoinTableName2 t2 + |ON t1.id = t2.id |""".stripMargin val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { sql(sqlQuery).collect().toSeq } + assert(rows.nonEmpty) withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) + checkJoinPushed(df) + checkAnswer(df, rows) + } + } + + test("Test right outer join without condition - no pushdown") { + val sqlQuery = + s""" + |SELECT * FROM + |$catalogAndNamespace.$casedJoinTableName1 a + |RIGHT JOIN + |$catalogAndNamespace.$casedJoinTableName2 b + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + checkJoinNotPushed(df) checkAnswer(df, rows) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org