This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 88dbe4297103 [SPARK-53066][SQL] Improve EXPLAIN output for DSv2 Join pushdown 88dbe4297103 is described below commit 88dbe42971038fbc162b186aded63fbb43e61ce8 Author: Petar Vasiljevic <petar.vasilje...@databricks.com> AuthorDate: Thu Aug 7 18:01:58 2025 +0800 [SPARK-53066][SQL] Improve EXPLAIN output for DSv2 Join pushdown ### What changes were proposed in this pull request? Prior to this change, `EXPLAIN FORMATTED` on query e.g.: ``` SELECT * FROM catalog.tbl1 t1 JOIN catalog.tbl2 t2 ON t1.id1 = t2.id2 JOIN catalog.tbl3 t3 ON t2.id2 = t3.id3 JOIN catalog.tbl4 t4 ON t3.id3 = t4.id4; ``` looked like: ``` PushedJoins: [join_pushdown_catalog.tbl1, join_pushdown_catalog.tbl2, join_pushdown_catalog.tlb3, join_pushdown_catalog.tlb4] ``` With the change from PR, the output of `EXPLAIN FORMATTED` would be: ``` == Physical Plan == *(1) Project [ID#x, AMOUNT#x, ADDRESS#x, ID_1#x AS ID#x, NEXT_ID#x, SALARY#x, SURNAME#x, id_3#x AS id#x, id_1_2#x AS id_1#x, id_2#x, id_1_1#x, sid#x, id_4#x AS id#x, id_1_3#x AS id_1#x, id_2_2#x AS id_2#x, id_2_1#x, Sid_1#x AS Sid#x] +- *(1) Scan JDBC v1 Relation from v2 scan join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1[ID#x,AMOUNT#x,ADDRESS#x,ID_1#x,NEXT_ID#x,SALARY#x,SURNAME#x,id_3#x,id_1_2#x,id_2#x,id_1_1#x,sid#x,id_4#x,id_1_3#x,id_2_2#x,id_2_1#x,Sid_1#x] PushedFilters: [id_3 = (id_4 + 1)], PushedJoins: [L]: PushedFilters: [ID_1 = (id_3 + 1)] PushedJoins: [L]: PushedFilters: [ID = (ID_1 + 1)] PushedJoins: [L]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1 PushedFilters: [ID IS NOT NULL] [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_2 PushedFilters: [ID IS NOT NULL] [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_3 PushedFilters: [id IS NOT NULL] [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_4 PushedFilters: [id IS NOT NULL] , ReadSchema: struct<ID:int,AMOUNT:decimal(10,2),ADDRESS:string,ID_1:int,NEXT_ID:int,SALARY:decimal(10,2),SURNAME:string,id_3:int,id_1_2:int,id_2:int,id_1_1:int,sid:int,id_4:int,id_1_3:int,id_2_2:int,id_2_1:int,Sid_1:int> ``` PushedFilters on top of PushedJoins are actually join conditions. It can be seen that the name of `Scan JDBC v1 Relation from v2 scan` is ` catalog.tbl1`. This should be fixed as well, but it won't be a part of this PR. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #51781 from PetarVasiljevic-DB/improve_explain_command_for_dsv2_join_pushdown. Authored-by: Petar Vasiljevic <petar.vasilje...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../join/OracleJoinPushdownIntegrationSuite.scala | 46 ++++++++ .../spark/sql/execution/DataSourceScanExec.scala | 89 +++++++++++++-- .../execution/datasources/DataSourceStrategy.scala | 6 +- .../datasources/v2/PushedDownOperators.scala | 4 +- .../datasources/v2/V2ScanRelationPushDown.scala | 23 +++- .../connector/DataSourcePushdownTestUtils.scala | 10 +- .../JDBCV2JoinPushdownIntegrationSuiteBase.scala | 125 ++++++++++----------- 7 files changed, 220 insertions(+), 83 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala index ecc0c5489bce..ad9e5d002bf5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc.v2.join import java.sql.Connection import java.util.Locale +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.{DockerJDBCIntegrationSuite, JdbcDialect, OracleDatabaseOnDocker, OracleDialect} import org.apache.spark.sql.jdbc.v2.JDBCV2JoinPushdownIntegrationSuiteBase import org.apache.spark.sql.types.DataTypes @@ -56,6 +57,15 @@ import org.apache.spark.tags.DockerTest class OracleJoinPushdownIntegrationSuite extends DockerJDBCIntegrationSuite with JDBCV2JoinPushdownIntegrationSuiteBase { + override def excluded: Seq[String] = Seq( + // Following tests are harder to be supported for Oracle because Oracle connector does + // casts in predicates. There is a separate test in this suite that is similar to + // "Test explain formatted" test from base suite. + "Test self join with condition", + "Test multi-way self join with conditions", + "Test explain formatted" + ) + override val namespace: String = "SYSTEM" override val db = new OracleDatabaseOnDocker @@ -74,4 +84,40 @@ class OracleJoinPushdownIntegrationSuite override def dataPreparation(connection: Connection): Unit = { super.dataPreparation() } + + test("Test explain formatted - Oracle compatible") { + val sqlQuery = + s""" + |SELECT * FROM $catalogAndNamespace.$casedJoinTableName1 a + |JOIN $catalogAndNamespace.$casedJoinTableName2 b + |ON a.id = b.id + 1 + |JOIN $catalogAndNamespace.$casedJoinTableName3 c + |ON b.id = c.id + 1 + |JOIN $catalogAndNamespace.$casedJoinTableName4 d + |ON c.id = d.id + 1 + |""".stripMargin + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + // scalastyle:off line.size.limit + checkJoinPushed( + df, + s"""PushedFilters: [CAST(id_3 AS decimal(11,0)) = (id_4 + 1)], PushedJoins:\u0020 + |[L]: PushedFilters: [CAST(ID_1 AS decimal(11,0)) = (id_3 + 1)] + | PushedJoins: + | [L]: PushedFilters: [CAST(ID AS decimal(11,0)) = (ID_1 + 1)] + | PushedJoins: + | [L]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + | [R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName2)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + | [R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName3)} + | PushedFilters: [id IS NOT NULL] + |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName4)} + | PushedFilters: [id IS NOT NULL]""".stripMargin + ) + // scalastyle:on line.size.limit + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1c5dd572edfb..66e07aa4f7d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} -import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators +import org.apache.spark.sql.execution.datasources.v2.{PushedDownOperators, TableSampleInfo} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.ConstantColumnVector import org.apache.spark.sql.internal.SQLConf @@ -157,10 +157,12 @@ case class RowDataSourceScanExec( override def inputRDD: RDD[InternalRow] = rdd - override val metadata: Map[String, String] = { + private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") - def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") + private def pushedSampleMetadataString(s: TableSampleInfo): String = + s"SAMPLE (${(s.upperBound - s.lowerBound) * 100}) ${s.withReplacement} SEED(${s.seed})" + override val metadata: Map[String, String] = { val markedFilters = if (filters.nonEmpty) { for (filter <- filters) yield { if (handledFilters.contains(filter)) s"*$filter" else s"$filter" @@ -187,8 +189,11 @@ case class RowDataSourceScanExec( seqToString(markedFilters.toSeq) } - val pushedJoins = if (pushedDownOperators.joinedRelations.length > 1) { - Map("PushedJoins" -> seqToString(pushedDownOperators.joinedRelations)) + val pushedJoins = if (pushedDownOperators.joinedRelationPushedDownOperators.nonEmpty) { + Map("PushedJoins" -> + s"\n${getPushedJoinString( + pushedDownOperators.joinedRelationPushedDownOperators(0), + pushedDownOperators.joinedRelationPushedDownOperators(1))}\n") } else { Map() } @@ -202,12 +207,80 @@ case class RowDataSourceScanExec( seqToString(v.groupByExpressions.map(_.describe()).toImmutableArraySeq))} ++ topNOrLimitInfo ++ offsetInfo ++ - pushedDownOperators.sample.map(v => "PushedSample" -> - s"SAMPLE (${(v.upperBound - v.lowerBound) * 100}) ${v.withReplacement} SEED(${v.seed})" - ) ++ + pushedDownOperators.sample.map(v => "PushedSample" -> pushedSampleMetadataString(v)) ++ pushedJoins } + /** + * Build string for all the pushed down join operators. The method is recursive, so if there is + * join on top of 2 already joined relations, all of these will be present in string. + * + * The exmaple of resulting string is the following: + * + * PushedFilters: [id_3 = (id_4 + 1)], PushedJoins: + * [L]: PushedFilters: [ID_1 = (id_3 + 1)] + * PushedJoins: + * [L]: PushedFilters: [ID = (ID_1 + 1)] + * PushedJoins: + * [L]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1 + * PushedFilters: [ID IS NOT NULL] + * [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_2 + * PushedFilters: [ID IS NOT NULL] + * [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_3 + * PushedFilters: [id IS NOT NULL] + * [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_4 + * PushedFilters: [id IS NOT NULL] + */ + private def getPushedJoinString( + leftSidePushedDownOperators: PushedDownOperators, + rightSidePushedDownOperators: PushedDownOperators, + indent: Int = 0): String = { + val indentStr = " ".repeat(indent) + + val leftSideOperators = buildOperatorParts(leftSidePushedDownOperators, indent) + val leftSideMetadataStr = formatMetadata(leftSideOperators, indentStr + " ".repeat(5)) + + val rightSideOperators = buildOperatorParts(rightSidePushedDownOperators, indent) + val rightSideMetadataStr = formatMetadata(rightSideOperators, indentStr + " ".repeat(5)) + + val leftSideString = s"$indentStr[L]: $leftSideMetadataStr" + val rightSideString = s"$indentStr[R]: $rightSideMetadataStr" + Seq(leftSideString, rightSideString).mkString("\n") + } + + private def buildOperatorParts(operators: PushedDownOperators, indent: Int): List[String] = { + val parts = List.newBuilder[String] + + // Add relation name for leaf nodes (nodes without further joins) + if (operators.joinedRelationPushedDownOperators.isEmpty) { + operators.relationName.foreach(name => parts += s"Relation: $name") + } + + if (operators.pushedPredicates.nonEmpty) { + parts += s"PushedFilters: ${seqToString(operators.pushedPredicates.map(_.describe()))}" + } + + operators.sample.foreach { sample => + parts += s"PushedSample: ${pushedSampleMetadataString(sample)}" + } + + // Recursively get the pushed join string for child with correct indentation. + if (operators.joinedRelationPushedDownOperators.nonEmpty) { + val nestedJoins = getPushedJoinString( + operators.joinedRelationPushedDownOperators(0), + operators.joinedRelationPushedDownOperators(1), + indent + 5) + parts += s"PushedJoins:\n$nestedJoins" + } + + parts.result() + } + + private def formatMetadata(parts: List[String], indentStr: String): String = { + val (basicParts, nestedJoinsParts) = parts.partition(!_.startsWith("PushedJoins:")) + (basicParts ++ nestedJoinsParts).mkString("\n" + indentStr) + } + // Don't care about `rdd` and `tableIdentifier`, and `stream` when canonicalizing. override def doCanonicalize(): SparkPlan = copy( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6bc793f89278..21acc99db265 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -401,7 +401,7 @@ object DataSourceStrategy l.output.toStructType, Set.empty, Set.empty, - PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, Seq.empty), + PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, Seq.empty, None), toCatalystRDD(l, baseRelation.buildScan()), baseRelation, l.stream, @@ -476,7 +476,7 @@ object DataSourceStrategy requestedColumns.toStructType, pushedFilters.toSet, handledFilters, - PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, Seq.empty), + PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, Seq.empty, None), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, relation.stream, @@ -500,7 +500,7 @@ object DataSourceStrategy requestedColumns.toStructType, pushedFilters.toSet, handledFilters, - PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, Seq.empty), + PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty, Seq.empty, None), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, relation.stream, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala index 668bfac0c452..c7d2c56b8985 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala @@ -31,6 +31,8 @@ case class PushedDownOperators( offset: Option[Int], sortValues: Seq[SortOrder], pushedPredicates: Seq[Predicate], - joinedRelations: Seq[String]) { + joinedRelationPushedDownOperators: Seq[PushedDownOperators], + // Relation name in case of leaf relation. For join nodes, this is empty. + relationName: Option[String]) { assert((limit.isEmpty && sortValues.isEmpty) || limit.isDefined) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index f9bb4d62b32d..31a98e1ff96c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -199,9 +199,15 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { rightSideRequiredColumnsWithAliases, translatedCondition.get) ) { + val leftSidePushedDownOperators = getPushedDownOperators(leftHolder) + val rightSidePushedDownOperators = getPushedDownOperators(rightHolder) + leftHolder.joinedRelations = leftHolder.joinedRelations ++ rightHolder.joinedRelations - leftHolder.pushedPredicates = leftHolder.pushedPredicates ++ - rightHolder.pushedPredicates :+ translatedCondition.get + leftHolder.joinedRelationsPushedDownOperators = + Seq(leftSidePushedDownOperators, rightSidePushedDownOperators) + + leftHolder.pushedPredicates = Seq(translatedCondition.get) + leftHolder.pushedSample = None leftHolder.output = node.output.asInstanceOf[Seq[AttributeReference]] leftHolder.pushedJoinOutputMap = pushedJoinOutputMap @@ -791,13 +797,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { f.pushedFilters() case _ => Array.empty[sources.Filter] } - val pushedDownOperators = PushedDownOperators(sHolder.pushedAggregate, sHolder.pushedSample, - sHolder.pushedLimit, sHolder.pushedOffset, sHolder.sortOrders, sHolder.pushedPredicates, - sHolder.joinedRelations.map(_.name)) + val pushedDownOperators = getPushedDownOperators(sHolder) V1ScanWrapper(v1, pushedFilters.toImmutableArraySeq, pushedDownOperators) case _ => scan } } + + private def getPushedDownOperators(sHolder: ScanBuilderHolder): PushedDownOperators = { + val optRelationName = Option.when(sHolder.joinedRelations.length <= 1)(sHolder.relation.name) + PushedDownOperators(sHolder.pushedAggregate, sHolder.pushedSample, + sHolder.pushedLimit, sHolder.pushedOffset, sHolder.sortOrders, sHolder.pushedPredicates, + sHolder.joinedRelationsPushedDownOperators, optRelationName) + } } case class ScanBuilderHolder( @@ -820,6 +831,8 @@ case class ScanBuilderHolder( var joinedRelations: Seq[DataSourceV2RelationBase] = Seq(relation) + var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] = Seq.empty[PushedDownOperators] + var pushedJoinOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala index ee2bcd34bf3f..03cea0fb9d6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala @@ -145,13 +145,17 @@ trait DataSourcePushdownTestUtils extends ExplainSuiteHelper { assert(joinNodes.nonEmpty, "Join should not be pushed down") } - protected def checkJoinPushed(df: DataFrame, expectedTables: String*): Unit = { + protected def checkJoinPushed(df: DataFrame): Unit = { val joinNodes = df.queryExecution.optimizedPlan.collect { case j: Join => j } assert(joinNodes.isEmpty, "Join should be pushed down") - if (expectedTables.nonEmpty) { - checkPushedInfo(df, s"PushedJoins: [${expectedTables.mkString(", ")}]") + } + + protected def checkJoinPushed(df: DataFrame, expectedPushdownString: String): Unit = { + checkJoinPushed(df) + if (expectedPushdownString.nonEmpty) { + checkPushedInfo(df, expectedPushdownString) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala index 7f267c738634..514c487cfae8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala @@ -56,11 +56,11 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase .set(s"spark.sql.catalog.$catalogName.pushDownOffset", "true") .set(s"spark.sql.catalog.$catalogName.caseSensitive", "false") - private def catalogAndNamespace = s"$catalogName.${caseConvert(namespace)}" - private def casedJoinTableName1 = caseConvert(joinTableName1) - private def casedJoinTableName2 = caseConvert(joinTableName2) - private def casedJoinTableName3 = caseConvert(joinTableName3) - private def casedJoinTableName4 = caseConvert(joinTableName4) + protected def catalogAndNamespace = s"$catalogName.${caseConvert(namespace)}" + protected def casedJoinTableName1 = caseConvert(joinTableName1) + protected def casedJoinTableName2 = caseConvert(joinTableName2) + protected def casedJoinTableName3 = caseConvert(joinTableName3) + protected def casedJoinTableName4 = caseConvert(joinTableName4) def qualifyTableName(tableName: String): String = { val fullyQualifiedCasedNamespace = jdbcDialect.quoteIdentifier(caseConvert(namespace)) @@ -287,11 +287,17 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) + // scalastyle:off line.size.limit checkJoinPushed( df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}" + s"""PushedFilters: [${caseConvert("id")} = (${caseConvert("id_1")} + 1)], PushedJoins:\u0020 + |[L]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL]""" + .stripMargin ) + // scalastyle:on line.size.limit checkAnswer(df, rows) } } @@ -301,8 +307,7 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase |SELECT * FROM |$catalogAndNamespace.$casedJoinTableName1 a |JOIN $catalogAndNamespace.$casedJoinTableName1 b ON b.id = a.id + 1 - |JOIN $catalogAndNamespace.$casedJoinTableName1 c ON c.id = b.id - 1 - |""".stripMargin + |JOIN $catalogAndNamespace.$casedJoinTableName1 c ON c.id = b.id - 1""".stripMargin val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { sql(sqlQuery).collect().toSeq @@ -313,12 +318,20 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) + // scalastyle:off line.size.limit checkJoinPushed( df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}" + s"""PushedFilters: [${caseConvert("id_2")} = (${caseConvert("id_1")} - 1)], PushedJoins:\u0020 + |[L]: PushedFilters: [${caseConvert("id_1")} = (${caseConvert("id")} + 1)] + | PushedJoins: + | [L]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + | [R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL]""".stripMargin ) + // scalastyle:on line.size.limit checkAnswer(df, rows) } } @@ -346,11 +359,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase ) ) checkPrunedColumnsDataTypeAndNullability(df, expectedSchema) - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}" - ) checkAnswer(df, rows) } } @@ -377,14 +385,9 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase ) ) checkPrunedColumnsDataTypeAndNullability(df, expectedSchema) - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}", - s"$catalogAndNamespace.${caseConvert(joinTableName2)}" - ) checkPushedInfo(df, - s"PushedFilters: [${caseConvert("id")} IS NOT NULL, " + - s"${caseConvert("next_id")} IS NOT NULL, " + + s"PushedFilters: [${caseConvert("id")} IS NOT NULL", + s"${caseConvert("next_id")} IS NOT NULL", s"${caseConvert("id")} = ${caseConvert("next_id")}]") checkAnswer(df, rows) } @@ -417,11 +420,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase ) ) checkPrunedColumnsDataTypeAndNullability(df, expectedSchema) - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}") checkAnswer(df, rows) } } @@ -459,12 +457,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val joinDf = df1.join(df2, "id") - - checkJoinPushed( - joinDf, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}" - ) checkAnswer(joinDf, rows) } } @@ -484,12 +476,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase val df = sql(sqlQuery) checkAggregateRemoved(df, supportsAggregatePushdown) - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}" - ) - checkAnswer(df, rows) } } @@ -508,12 +494,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) - - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}," + - s" $catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}") checkAnswer(df, rows) } } @@ -538,12 +518,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase checkSortRemoved(df, supportsSortPushdown) checkLimitRemoved(df, supportsLimitPushdown) - - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + - s"$catalogAndNamespace.${caseConvert(joinTableName1)}" - ) checkAnswer(df, rows) } } @@ -563,11 +537,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}", - s"$catalogAndNamespace.${caseConvert(joinTableName2)}" - ) checkFilterPushed(df, supportsFilterPushdown) checkAnswer(df, rows) } @@ -588,11 +557,6 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { val df = sql(sqlQuery) - checkJoinPushed( - df, - expectedTables = s"$catalogAndNamespace.${caseConvert(joinTableName1)}", - s"$catalogAndNamespace.${caseConvert(joinTableName2)}" - ) checkAnswer(df, rows) } } @@ -704,4 +668,39 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase checkJoinPushed(df) } } + + test("Test explain formatted") { + val sqlQuery = s""" + |SELECT * FROM $catalogAndNamespace.$casedJoinTableName1 a + |JOIN $catalogAndNamespace.$casedJoinTableName2 b + |ON a.id = b.id + 1 + |JOIN $catalogAndNamespace.$casedJoinTableName3 c + |ON b.id = c.id + 1 + |JOIN $catalogAndNamespace.$casedJoinTableName4 d + |ON c.id = d.id + 1 + |""".stripMargin + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + // scalastyle:off line.size.limit + checkJoinPushed( + df, + s"""PushedFilters: [id_3 = (id_4 + 1)], PushedJoins:\u0020 + |[L]: PushedFilters: [${caseConvert("id_1")} = (id_3 + 1)] + | PushedJoins: + | [L]: PushedFilters: [${caseConvert("id")} = (${caseConvert("id_1")} + 1)] + | PushedJoins: + | [L]: Relation: $catalogAndNamespace.${caseConvert(joinTableName1)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + | [R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName2)} + | PushedFilters: [${caseConvert("id")} IS NOT NULL] + | [R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName3)} + | PushedFilters: [id IS NOT NULL] + |[R]: Relation: $catalogAndNamespace.${caseConvert(joinTableName4)} + | PushedFilters: [id IS NOT NULL]""".stripMargin + ) + // scalastyle:on line.size.limit + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org