This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bd864a085f2 [SPARK-41660][SQL] Only propagate metadata columns if they are used bd864a085f2 is described below commit bd864a085f2764b8ccdfe67ffaf7400b6f44f717 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Dec 21 21:07:34 2022 +0800 [SPARK-41660][SQL] Only propagate metadata columns if they are used ### What changes were proposed in this pull request? Ideally it's OK to always propagate metadata columns, as column pruning will kick in later and prune them aways if they are not used. However, it may cause problems in cases like CTE. https://github.com/apache/spark/pull/39081 fixed such a bug. This PR only propagates metadata columns if they are used, to keep the analyzed plan simple and reliable. ### Why are the changes needed? avoid potential bugs. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #39152 from cloud-fan/follow. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 15 +++++++++------ .../spark/sql/connector/MetadataColumnSuite.scala | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e959e7208a4..c21ff7bd90f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -978,7 +978,7 @@ class Analyzer(override val catalogManager: CatalogManager) if (metaCols.isEmpty) { node } else { - val newNode = addMetadataCol(node) + val newNode = addMetadataCol(node, attr => metaCols.exists(_.exprId == attr.exprId)) // We should not change the output schema of the plan. We should project away the extra // metadata columns if necessary. if (newNode.sameOutput(node)) { @@ -1012,15 +1012,18 @@ class Analyzer(override val catalogManager: CatalogManager) }) } - private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match { - case s: ExposesMetadataColumns => s.withMetadataColumns() - case p: Project => + private def addMetadataCol( + plan: LogicalPlan, + isRequired: Attribute => Boolean): LogicalPlan = plan match { + case s: ExposesMetadataColumns if s.metadataOutput.exists(isRequired) => + s.withMetadataColumns() + case p: Project if p.metadataOutput.exists(isRequired) => val newProj = p.copy( projectList = p.projectList ++ p.metadataOutput, - child = addMetadataCol(p.child)) + child = addMetadataCol(p.child, isRequired)) newProj.copyTagsFrom(p) newProj - case _ => plan.withNewChildren(plan.children.map(addMetadataCol)) + case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, isRequired))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala index 8454b9f85ec..9abf0fd59e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.struct class MetadataColumnSuite extends DatasourceV2SQLBase { @@ -232,4 +233,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase { ) } } + + test("SPARK-41660: only propagate metadata columns if they are used") { + withTable(tbl) { + prepareTable() + val df = sql(s"SELECT t2.id FROM $tbl t1 JOIN $tbl t2 USING (id)") + val scans = df.logicalPlan.collect { + case d: DataSourceV2Relation => d + } + assert(scans.length == 2) + scans.foreach { scan => + // The query only access join hidden columns, and scan nodes should not expose its metadata + // columns. + assert(scan.output.map(_.name) == Seq("id", "data")) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org