This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 88dbe4297103 [SPARK-53066][SQL] Improve EXPLAIN output for DSv2 Join 
pushdown
88dbe4297103 is described below

commit 88dbe42971038fbc162b186aded63fbb43e61ce8
Author: Petar Vasiljevic <petar.vasilje...@databricks.com>
AuthorDate: Thu Aug 7 18:01:58 2025 +0800

    [SPARK-53066][SQL] Improve EXPLAIN output for DSv2 Join pushdown
    
    ### What changes were proposed in this pull request?
    Prior to this change, `EXPLAIN FORMATTED` on query e.g.:
    
    ```
    SELECT *
    FROM catalog.tbl1 t1
        JOIN catalog.tbl2 t2 ON t1.id1 = t2.id2
        JOIN catalog.tbl3 t3 ON t2.id2 = t3.id3
        JOIN catalog.tbl4 t4 ON t3.id3 = t4.id4;
    ```
    
    looked like:
    ```
    PushedJoins: [join_pushdown_catalog.tbl1, join_pushdown_catalog.tbl2, 
join_pushdown_catalog.tlb3, join_pushdown_catalog.tlb4]
    ```
    
    With the change from PR, the output of `EXPLAIN FORMATTED` would be:
    
    ```
     == Physical Plan ==
       *(1) Project [ID#x, AMOUNT#x, ADDRESS#x, ID_1#x AS ID#x, NEXT_ID#x, 
SALARY#x, SURNAME#x, id_3#x AS id#x, id_1_2#x AS id_1#x, id_2#x, id_1_1#x, 
sid#x, id_4#x AS id#x, id_1_3#x AS id_1#x, id_2_2#x AS id_2#x, id_2_1#x, 
Sid_1#x AS Sid#x]
       +- *(1) Scan JDBC v1 Relation from v2 scan 
join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1[ID#x,AMOUNT#x,ADDRESS#x,ID_1#x,NEXT_ID#x,SALARY#x,SURNAME#x,id_3#x,id_1_2#x,id_2#x,id_1_1#x,sid#x,id_4#x,id_1_3#x,id_2_2#x,id_2_1#x,Sid_1#x]
 PushedFilters: [id_3 = (id_4 + 1)], PushedJoins:
       [L]: PushedFilters: [ID_1 = (id_3 + 1)]
            PushedJoins:
            [L]: PushedFilters: [ID = (ID_1 + 1)]
                 PushedJoins:
                 [L]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1
                      PushedFilters: [ID IS NOT NULL]
                 [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_2
                      PushedFilters: [ID IS NOT NULL]
            [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_3
                 PushedFilters: [id IS NOT NULL]
       [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_4
            PushedFilters: [id IS NOT NULL]
       , ReadSchema: 
struct<ID:int,AMOUNT:decimal(10,2),ADDRESS:string,ID_1:int,NEXT_ID:int,SALARY:decimal(10,2),SURNAME:string,id_3:int,id_1_2:int,id_2:int,id_1_1:int,sid:int,id_4:int,id_1_3:int,id_2_2:int,id_2_1:int,Sid_1:int>
    ```
    
    PushedFilters on top of PushedJoins are actually join conditions.
    It can be seen that the name of `Scan JDBC v1 Relation from v2 scan` is ` 
catalog.tbl1`. This should be fixed as well, but it won't be a part of this PR.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #51781 from 
PetarVasiljevic-DB/improve_explain_command_for_dsv2_join_pushdown.
    
    Authored-by: Petar Vasiljevic <petar.vasilje...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../join/OracleJoinPushdownIntegrationSuite.scala  |  46 ++++++++
 .../spark/sql/execution/DataSourceScanExec.scala   |  89 +++++++++++++--
 .../execution/datasources/DataSourceStrategy.scala |   6 +-
 .../datasources/v2/PushedDownOperators.scala       |   4 +-
 .../datasources/v2/V2ScanRelationPushDown.scala    |  23 +++-
 .../connector/DataSourcePushdownTestUtils.scala    |  10 +-
 .../JDBCV2JoinPushdownIntegrationSuiteBase.scala   | 125 ++++++++++-----------
 7 files changed, 220 insertions(+), 83 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala
index ecc0c5489bce..ad9e5d002bf5 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc.v2.join
 import java.sql.Connection
 import java.util.Locale
 
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.jdbc.{DockerJDBCIntegrationSuite, JdbcDialect, 
OracleDatabaseOnDocker, OracleDialect}
 import org.apache.spark.sql.jdbc.v2.JDBCV2JoinPushdownIntegrationSuiteBase
 import org.apache.spark.sql.types.DataTypes
@@ -56,6 +57,15 @@ import org.apache.spark.tags.DockerTest
 class OracleJoinPushdownIntegrationSuite
   extends DockerJDBCIntegrationSuite
   with JDBCV2JoinPushdownIntegrationSuiteBase {
+  override def excluded: Seq[String] = Seq(
+    // Following tests are harder to be supported for Oracle because Oracle 
connector does
+    // casts in predicates. There is a separate test in this suite that is 
similar to
+    // "Test explain formatted" test from base suite.
+    "Test self join with condition",
+    "Test multi-way self join with conditions",
+    "Test explain formatted"
+  )
+
   override val namespace: String = "SYSTEM"
 
   override val db = new OracleDatabaseOnDocker
@@ -74,4 +84,40 @@ class OracleJoinPushdownIntegrationSuite
   override def dataPreparation(connection: Connection): Unit = {
     super.dataPreparation()
   }
+
+  test("Test explain formatted - Oracle compatible") {
+    val sqlQuery =
+      s"""
+         |SELECT * FROM $catalogAndNamespace.$casedJoinTableName1 a
+         |JOIN $catalogAndNamespace.$casedJoinTableName2 b
+         |ON a.id = b.id + 1
+         |JOIN $catalogAndNamespace.$casedJoinTableName3 c
+         |ON b.id = c.id + 1
+         |JOIN $catalogAndNamespace.$casedJoinTableName4 d
+         |ON c.id = d.id + 1
+         |""".stripMargin
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      // scalastyle:off line.size.limit
+      checkJoinPushed(
+        df,
+        s"""PushedFilters: [CAST(id_3 AS decimal(11,0)) = (id_4 + 1)], 
PushedJoins:\u0020
+           |[L]: PushedFilters: [CAST(ID_1 AS decimal(11,0)) = (id_3 + 1)]
+           |     PushedJoins:
+           |     [L]: PushedFilters: [CAST(ID AS decimal(11,0)) = (ID_1 + 1)]
+           |          PushedJoins:
+           |          [L]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName1)}
+           |               PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |          [R]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName2)}
+           |               PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |     [R]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName3)}
+           |          PushedFilters: [id IS NOT NULL]
+           |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName4)}
+           |     PushedFilters: [id IS NOT NULL]""".stripMargin
+      )
+      // scalastyle:on line.size.limit
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 1c5dd572edfb..66e07aa4f7d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
-import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.datasources.v2.{PushedDownOperators, 
TableSampleInfo}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
 import org.apache.spark.sql.internal.SQLConf
@@ -157,10 +157,12 @@ case class RowDataSourceScanExec(
 
   override def inputRDD: RDD[InternalRow] = rdd
 
-  override val metadata: Map[String, String] = {
+  private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
 
-    def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
+  private def pushedSampleMetadataString(s: TableSampleInfo): String =
+    s"SAMPLE (${(s.upperBound - s.lowerBound) * 100}) ${s.withReplacement} 
SEED(${s.seed})"
 
+  override val metadata: Map[String, String] = {
     val markedFilters = if (filters.nonEmpty) {
       for (filter <- filters) yield {
         if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
@@ -187,8 +189,11 @@ case class RowDataSourceScanExec(
       seqToString(markedFilters.toSeq)
     }
 
-    val pushedJoins = if (pushedDownOperators.joinedRelations.length > 1) {
-      Map("PushedJoins" -> seqToString(pushedDownOperators.joinedRelations))
+    val pushedJoins = if 
(pushedDownOperators.joinedRelationPushedDownOperators.nonEmpty) {
+      Map("PushedJoins" ->
+        s"\n${getPushedJoinString(
+          pushedDownOperators.joinedRelationPushedDownOperators(0),
+          pushedDownOperators.joinedRelationPushedDownOperators(1))}\n")
     } else {
       Map()
     }
@@ -202,12 +207,80 @@ case class RowDataSourceScanExec(
             
seqToString(v.groupByExpressions.map(_.describe()).toImmutableArraySeq))} ++
       topNOrLimitInfo ++
       offsetInfo ++
-      pushedDownOperators.sample.map(v => "PushedSample" ->
-        s"SAMPLE (${(v.upperBound - v.lowerBound) * 100}) ${v.withReplacement} 
SEED(${v.seed})"
-      ) ++
+      pushedDownOperators.sample.map(v => "PushedSample" -> 
pushedSampleMetadataString(v)) ++
       pushedJoins
   }
 
+  /**
+   * Build string for all the pushed down join operators. The method is 
recursive, so if there is
+   * join on top of 2 already joined relations, all of these will be present 
in string.
+   *
+   * The exmaple of resulting string is the following:
+   *
+   * PushedFilters: [id_3 = (id_4 + 1)], PushedJoins:
+   * [L]: PushedFilters: [ID_1 = (id_3 + 1)]
+   *      PushedJoins:
+   *      [L]: PushedFilters: [ID = (ID_1 + 1)]
+   *           PushedJoins:
+   *           [L]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1
+   *                PushedFilters: [ID IS NOT NULL]
+   *           [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_2
+   *                PushedFilters: [ID IS NOT NULL]
+   *      [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_3
+   *           PushedFilters: [id IS NOT NULL]
+   * [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_4
+   *      PushedFilters: [id IS NOT NULL]
+   */
+  private def getPushedJoinString(
+      leftSidePushedDownOperators: PushedDownOperators,
+      rightSidePushedDownOperators: PushedDownOperators,
+      indent: Int = 0): String = {
+    val indentStr = " ".repeat(indent)
+
+    val leftSideOperators = buildOperatorParts(leftSidePushedDownOperators, 
indent)
+    val leftSideMetadataStr = formatMetadata(leftSideOperators, indentStr + " 
".repeat(5))
+
+    val rightSideOperators = buildOperatorParts(rightSidePushedDownOperators, 
indent)
+    val rightSideMetadataStr = formatMetadata(rightSideOperators, indentStr + 
" ".repeat(5))
+
+    val leftSideString = s"$indentStr[L]: $leftSideMetadataStr"
+    val rightSideString = s"$indentStr[R]: $rightSideMetadataStr"
+    Seq(leftSideString, rightSideString).mkString("\n")
+  }
+
+  private def buildOperatorParts(operators: PushedDownOperators, indent: Int): 
List[String] = {
+    val parts = List.newBuilder[String]
+
+    // Add relation name for leaf nodes (nodes without further joins)
+    if (operators.joinedRelationPushedDownOperators.isEmpty) {
+      operators.relationName.foreach(name => parts += s"Relation: $name")
+    }
+
+    if (operators.pushedPredicates.nonEmpty) {
+      parts += s"PushedFilters: 
${seqToString(operators.pushedPredicates.map(_.describe()))}"
+    }
+
+    operators.sample.foreach { sample =>
+      parts += s"PushedSample: ${pushedSampleMetadataString(sample)}"
+    }
+
+    // Recursively get the pushed join string for child with correct 
indentation.
+    if (operators.joinedRelationPushedDownOperators.nonEmpty) {
+      val nestedJoins = getPushedJoinString(
+        operators.joinedRelationPushedDownOperators(0),
+        operators.joinedRelationPushedDownOperators(1),
+        indent + 5)
+      parts += s"PushedJoins:\n$nestedJoins"
+    }
+
+    parts.result()
+  }
+
+  private def formatMetadata(parts: List[String], indentStr: String): String = 
{
+    val (basicParts, nestedJoinsParts) = 
parts.partition(!_.startsWith("PushedJoins:"))
+    (basicParts ++ nestedJoinsParts).mkString("\n" + indentStr)
+  }
+
   // Don't care about `rdd` and `tableIdentifier`, and `stream` when 
canonicalizing.
   override def doCanonicalize(): SparkPlan =
     copy(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6bc793f89278..21acc99db265 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -401,7 +401,7 @@ object DataSourceStrategy
         l.output.toStructType,
         Set.empty,
         Set.empty,
-        PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, 
Seq.empty),
+        PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, 
Seq.empty, None),
         toCatalystRDD(l, baseRelation.buildScan()),
         baseRelation,
         l.stream,
@@ -476,7 +476,7 @@ object DataSourceStrategy
         requestedColumns.toStructType,
         pushedFilters.toSet,
         handledFilters,
-        PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, 
Seq.empty),
+        PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, 
Seq.empty, None),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation,
         relation.stream,
@@ -500,7 +500,7 @@ object DataSourceStrategy
         requestedColumns.toStructType,
         pushedFilters.toSet,
         handledFilters,
-        PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, 
Seq.empty),
+        PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, 
Seq.empty, None),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation,
         relation.stream,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala
index 668bfac0c452..c7d2c56b8985 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala
@@ -31,6 +31,8 @@ case class PushedDownOperators(
     offset: Option[Int],
     sortValues: Seq[SortOrder],
     pushedPredicates: Seq[Predicate],
-    joinedRelations: Seq[String]) {
+    joinedRelationPushedDownOperators: Seq[PushedDownOperators],
+    // Relation name in case of leaf relation. For join nodes, this is empty.
+    relationName: Option[String]) {
   assert((limit.isEmpty && sortValues.isEmpty) || limit.isDefined)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index f9bb4d62b32d..31a98e1ff96c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -199,9 +199,15 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
           rightSideRequiredColumnsWithAliases,
           translatedCondition.get)
       ) {
+        val leftSidePushedDownOperators = getPushedDownOperators(leftHolder)
+        val rightSidePushedDownOperators = getPushedDownOperators(rightHolder)
+
         leftHolder.joinedRelations = leftHolder.joinedRelations ++ 
rightHolder.joinedRelations
-        leftHolder.pushedPredicates = leftHolder.pushedPredicates ++
-          rightHolder.pushedPredicates :+ translatedCondition.get
+        leftHolder.joinedRelationsPushedDownOperators =
+          Seq(leftSidePushedDownOperators, rightSidePushedDownOperators)
+
+        leftHolder.pushedPredicates = Seq(translatedCondition.get)
+        leftHolder.pushedSample = None
 
         leftHolder.output = node.output.asInstanceOf[Seq[AttributeReference]]
         leftHolder.pushedJoinOutputMap = pushedJoinOutputMap
@@ -791,13 +797,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
             f.pushedFilters()
           case _ => Array.empty[sources.Filter]
         }
-        val pushedDownOperators = PushedDownOperators(sHolder.pushedAggregate, 
sHolder.pushedSample,
-          sHolder.pushedLimit, sHolder.pushedOffset, sHolder.sortOrders, 
sHolder.pushedPredicates,
-          sHolder.joinedRelations.map(_.name))
+        val pushedDownOperators = getPushedDownOperators(sHolder)
         V1ScanWrapper(v1, pushedFilters.toImmutableArraySeq, 
pushedDownOperators)
       case _ => scan
     }
   }
+
+  private def getPushedDownOperators(sHolder: ScanBuilderHolder): 
PushedDownOperators = {
+    val optRelationName = Option.when(sHolder.joinedRelations.length <= 
1)(sHolder.relation.name)
+    PushedDownOperators(sHolder.pushedAggregate, sHolder.pushedSample,
+      sHolder.pushedLimit, sHolder.pushedOffset, sHolder.sortOrders, 
sHolder.pushedPredicates,
+      sHolder.joinedRelationsPushedDownOperators, optRelationName)
+  }
 }
 
 case class ScanBuilderHolder(
@@ -820,6 +831,8 @@ case class ScanBuilderHolder(
 
   var joinedRelations: Seq[DataSourceV2RelationBase] = Seq(relation)
 
+  var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] = 
Seq.empty[PushedDownOperators]
+
   var pushedJoinOutputMap: AttributeMap[Expression] = 
AttributeMap.empty[Expression]
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
index ee2bcd34bf3f..03cea0fb9d6f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
@@ -145,13 +145,17 @@ trait DataSourcePushdownTestUtils extends 
ExplainSuiteHelper {
     assert(joinNodes.nonEmpty, "Join should not be pushed down")
   }
 
-  protected def checkJoinPushed(df: DataFrame, expectedTables: String*): Unit 
= {
+  protected def checkJoinPushed(df: DataFrame): Unit = {
     val joinNodes = df.queryExecution.optimizedPlan.collect {
       case j: Join => j
     }
     assert(joinNodes.isEmpty, "Join should be pushed down")
-    if (expectedTables.nonEmpty) {
-      checkPushedInfo(df, s"PushedJoins: [${expectedTables.mkString(", ")}]")
+  }
+
+  protected def checkJoinPushed(df: DataFrame, expectedPushdownString: 
String): Unit = {
+    checkJoinPushed(df)
+    if (expectedPushdownString.nonEmpty) {
+      checkPushedInfo(df, expectedPushdownString)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
index 7f267c738634..514c487cfae8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
@@ -56,11 +56,11 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     .set(s"spark.sql.catalog.$catalogName.pushDownOffset", "true")
     .set(s"spark.sql.catalog.$catalogName.caseSensitive", "false")
 
-  private def catalogAndNamespace = s"$catalogName.${caseConvert(namespace)}"
-  private def casedJoinTableName1 = caseConvert(joinTableName1)
-  private def casedJoinTableName2 = caseConvert(joinTableName2)
-  private def casedJoinTableName3 = caseConvert(joinTableName3)
-  private def casedJoinTableName4 = caseConvert(joinTableName4)
+  protected def catalogAndNamespace = s"$catalogName.${caseConvert(namespace)}"
+  protected def casedJoinTableName1 = caseConvert(joinTableName1)
+  protected def casedJoinTableName2 = caseConvert(joinTableName2)
+  protected def casedJoinTableName3 = caseConvert(joinTableName3)
+  protected def casedJoinTableName4 = caseConvert(joinTableName4)
 
   def qualifyTableName(tableName: String): String = {
     val fullyQualifiedCasedNamespace = 
jdbcDialect.quoteIdentifier(caseConvert(namespace))
@@ -287,11 +287,17 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
 
+      // scalastyle:off line.size.limit
       checkJoinPushed(
         df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
+        s"""PushedFilters: [${caseConvert("id")} = (${caseConvert("id_1")} + 
1)], PushedJoins:\u0020
+           |[L]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)}
+           |     PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)}
+           |     PushedFilters: [${caseConvert("id")} IS NOT NULL]"""
+          .stripMargin
       )
+      // scalastyle:on line.size.limit
       checkAnswer(df, rows)
     }
   }
@@ -301,8 +307,7 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
       |SELECT * FROM
       |$catalogAndNamespace.$casedJoinTableName1 a
       |JOIN $catalogAndNamespace.$casedJoinTableName1 b ON b.id = a.id + 1
-      |JOIN $catalogAndNamespace.$casedJoinTableName1 c ON c.id = b.id - 1
-      |""".stripMargin
+      |JOIN $catalogAndNamespace.$casedJoinTableName1 c ON c.id = b.id - 
1""".stripMargin
 
     val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
       sql(sqlQuery).collect().toSeq
@@ -313,12 +318,20 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
 
+      // scalastyle:off line.size.limit
       checkJoinPushed(
         df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
+        s"""PushedFilters: [${caseConvert("id_2")} = (${caseConvert("id_1")} - 
1)], PushedJoins:\u0020
+           |[L]: PushedFilters: [${caseConvert("id_1")} = 
(${caseConvert("id")} + 1)]
+           |     PushedJoins:
+           |     [L]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName1)}
+           |          PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |     [R]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName1)}
+           |          PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)}
+           |     PushedFilters: [${caseConvert("id")} IS NOT 
NULL]""".stripMargin
       )
+      // scalastyle:on line.size.limit
       checkAnswer(df, rows)
     }
   }
@@ -346,11 +359,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
         )
       )
       checkPrunedColumnsDataTypeAndNullability(df, expectedSchema)
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
-      )
       checkAnswer(df, rows)
     }
   }
@@ -377,14 +385,9 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
         )
       )
       checkPrunedColumnsDataTypeAndNullability(df, expectedSchema)
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}",
-          s"$catalogAndNamespace.${caseConvert(joinTableName2)}"
-      )
       checkPushedInfo(df,
-        s"PushedFilters: [${caseConvert("id")} IS NOT NULL, " +
-          s"${caseConvert("next_id")} IS NOT NULL, " +
+        s"PushedFilters: [${caseConvert("id")} IS NOT NULL",
+          s"${caseConvert("next_id")} IS NOT NULL",
           s"${caseConvert("id")} = ${caseConvert("next_id")}]")
       checkAnswer(df, rows)
     }
@@ -417,11 +420,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
         )
       )
       checkPrunedColumnsDataTypeAndNullability(df, expectedSchema)
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}")
       checkAnswer(df, rows)
     }
   }
@@ -459,12 +457,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val joinDf = df1.join(df2, "id")
-
-      checkJoinPushed(
-        joinDf,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
-      )
       checkAnswer(joinDf, rows)
     }
   }
@@ -484,12 +476,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
       val df = sql(sqlQuery)
 
       checkAggregateRemoved(df, supportsAggregatePushdown)
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
-      )
-
       checkAnswer(df, rows)
     }
   }
@@ -508,12 +494,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
-
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}," +
-          s" $catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}")
       checkAnswer(df, rows)
     }
   }
@@ -538,12 +518,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
       checkSortRemoved(df, supportsSortPushdown)
       checkLimitRemoved(df, supportsLimitPushdown)
-
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " +
-          s"$catalogAndNamespace.${caseConvert(joinTableName1)}"
-      )
       checkAnswer(df, rows)
     }
   }
@@ -563,11 +537,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}",
-          s"$catalogAndNamespace.${caseConvert(joinTableName2)}"
-      )
       checkFilterPushed(df, supportsFilterPushdown)
       checkAnswer(df, rows)
     }
@@ -588,11 +557,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
 
     withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
       val df = sql(sqlQuery)
-      checkJoinPushed(
-        df,
-        expectedTables = 
s"$catalogAndNamespace.${caseConvert(joinTableName1)}",
-          s"$catalogAndNamespace.${caseConvert(joinTableName2)}"
-      )
       checkAnswer(df, rows)
     }
   }
@@ -704,4 +668,39 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
       checkJoinPushed(df)
     }
   }
+
+  test("Test explain formatted") {
+    val sqlQuery = s"""
+      |SELECT * FROM $catalogAndNamespace.$casedJoinTableName1 a
+      |JOIN $catalogAndNamespace.$casedJoinTableName2 b
+      |ON a.id = b.id + 1
+      |JOIN $catalogAndNamespace.$casedJoinTableName3 c
+      |ON b.id = c.id + 1
+      |JOIN $catalogAndNamespace.$casedJoinTableName4 d
+      |ON c.id = d.id + 1
+      |""".stripMargin
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      // scalastyle:off line.size.limit
+      checkJoinPushed(
+        df,
+        s"""PushedFilters: [id_3 = (id_4 + 1)], PushedJoins:\u0020
+           |[L]: PushedFilters: [${caseConvert("id_1")} = (id_3 + 1)]
+           |     PushedJoins:
+           |     [L]: PushedFilters: [${caseConvert("id")} = 
(${caseConvert("id_1")} + 1)]
+           |          PushedJoins:
+           |          [L]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName1)}
+           |               PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |          [R]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName2)}
+           |               PushedFilters: [${caseConvert("id")} IS NOT NULL]
+           |     [R]: Relation: 
$catalogAndNamespace.${caseConvert(joinTableName3)}
+           |          PushedFilters: [id IS NOT NULL]
+           |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName4)}
+           |     PushedFilters: [id IS NOT NULL]""".stripMargin
+      )
+      // scalastyle:on line.size.limit
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to