This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 44db44c5ea8f [SPARK-49110][SQL] Simplify SubqueryAlias.metadataOutput
to always propagate metadata columns
44db44c5ea8f is described below
commit 44db44c5ea8f75fe8f42c659b61f8f739da375f5
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Jan 30 22:18:08 2026 +0800
[SPARK-49110][SQL] Simplify SubqueryAlias.metadataOutput to always
propagate metadata columns
### What changes were proposed in this pull request?
This PR simplifies `SubqueryAlias.metadataOutput` to always propagate
metadata columns from its child, rather than only propagating when the child is
a `LeafNode` or another `SubqueryAlias`.
The previous implementation was introduced in SPARK-40149 as a workaround
to forbid queries like `SELECT m FROM (SELECT a FROM t)` while still allowing
DataFrame API chaining. However, this created an inconsistency since
`SubqueryAlias` should conceptually just rename/qualify columns, not filter
which ones are accessible.
With this change:
- `SubqueryAlias` always propagates `metadataOutput` (with qualifier
applied)
- The `qualifiedAccessOnly` filter is preserved to handle natural join
metadata columns
- Queries like `SELECT m FROM (SELECT a FROM t) AS alias` now work,
consistent with how `Project` already propagates metadata columns
### Why are the changes needed?
1. **Consistency**: `SubqueryAlias` is a rename operation and should not
selectively block metadata column propagation
2. **Simpler code**: Removes the special-case logic checking for
`LeafNode`/`SubqueryAlias` children
3. **Better error messages**: When metadata columns from both sides of a
join have the same name, users now get an "ambiguous reference" error rather
than "column not found"
### Does this PR introduce _any_ user-facing change?
Yes, queries that previously failed with "column not found" when accessing
metadata columns through a subquery alias will now succeed (if unambiguous) or
fail with "ambiguous reference" (if multiple columns have the same name).
### How was this patch tested?
Updated existing tests and added new test for ambiguous metadata columns
after join with SubqueryAlias.
### Was this patch authored or co-authored using generative AI tooling?
Yes.
Closes #53861 from cloud-fan/meta_col.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../plans/logical/basicLogicalOperators.scala | 12 ++--
.../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++
.../spark/sql/connector/MetadataColumnSuite.scala | 71 ++++++++++++++++++++--
3 files changed, 84 insertions(+), 10 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 2bbf21016a91..6b1b068234de 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1757,11 +1757,15 @@ case class SubqueryAlias(
}
override def metadataOutput: Seq[Attribute] = {
- // Propagate metadata columns from leaf nodes through a chain of
`SubqueryAlias`.
- if (child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]) {
+ val canPropagate = if
(conf.getConf(SQLConf.SUBQUERY_ALIAS_ALWAYS_PROPAGATE_METADATA_COLUMNS)) {
+ true
+ } else {
+ // Legacy behavior: only propagate metadata columns if child is a
LeafNode or SubqueryAlias.
+ child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]
+ }
+ if (canPropagate) {
val qualifierList = identifier.qualifier :+ alias
- val nonHiddenMetadataOutput =
child.metadataOutput.filter(!_.qualifiedAccessOnly)
- nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
+
child.metadataOutput.filter(!_.qualifiedAccessOnly).map(_.withQualifier(qualifierList))
} else {
Nil
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e86466826019..cd892936c9b7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -281,6 +281,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val SUBQUERY_ALIAS_ALWAYS_PROPAGATE_METADATA_COLUMNS =
+ buildConf("spark.sql.analyzer.subqueryAliasAlwaysPropagateMetadataColumns")
+ .internal()
+ .version("4.2.0")
+ .doc(
+ "When true, SubqueryAlias always propagates metadata columns from its
child. " +
+ "When false, SubqueryAlias only propagates metadata columns if the
child is a " +
+ "LeafNode or another SubqueryAlias (legacy behavior).")
+ .booleanConf
+ .createWithDefault(true)
+
val BLOCK_CREATE_TEMP_TABLE_USING_PROVIDER =
buildConf("spark.sql.legacy.blockCreateTempTableUsingProvider")
.doc("If enabled, we fail legacy CREATE TEMPORARY TABLE ... USING
provider during parsing.")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 741e30a739f5..3bfd57e867c0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -22,6 +22,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, struct}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
class MetadataColumnSuite extends DatasourceV2SQLBase {
@@ -190,7 +191,7 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
}
}
- test("SPARK-34923: propagate metadata columns through SubqueryAlias if child
is leaf node") {
+ test("SPARK-34923: propagate metadata columns through SubqueryAlias") {
val sbq = "sbq"
withTable(tbl) {
prepareTable()
@@ -203,12 +204,43 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"),
Row(3, "c", 0, "1/3")))
}
- assertThrows[AnalysisException] {
- sql(s"SELECT $sbq.index FROM (SELECT id FROM $tbl) $sbq")
- }
- assertThrows[AnalysisException] {
- spark.table(tbl).select($"id").as(sbq).select(s"$sbq.index")
+ // Metadata columns are propagated through SubqueryAlias even if child
is not a leaf node.
+ checkAnswer(
+ sql(s"SELECT $sbq.index FROM (SELECT id FROM $tbl) $sbq"),
+ Seq(Row(0), Row(0), Row(0))
+ )
+ checkAnswer(
+ spark.table(tbl).select($"id").as(sbq).select(s"$sbq.index"),
+ Seq(Row(0), Row(0), Row(0))
+ )
+ }
+ }
+
+ test("ambiguous metadata columns after join with SubqueryAlias") {
+ val tbl2 = "testcat.t2"
+ withTable(tbl, tbl2) {
+ prepareTable()
+ sql(s"CREATE TABLE $tbl2 (id2 bigint, value string) PARTITIONED BY
(bucket(4, id2), id2)")
+ sql(s"INSERT INTO $tbl2 VALUES (1, 'x'), (2, 'y'), (3, 'z')")
+
+ // Both tables have 'index' metadata column. When joined and aliased,
+ // accessing 'j.index' is ambiguous.
+ val ambiguousError = intercept[AnalysisException] {
+ sql(s"SELECT j.index FROM ($tbl JOIN $tbl2 ON $tbl.id = $tbl2.id2) AS
j")
}
+ assert(ambiguousError.getMessage.contains("ambiguous"))
+
+ // Accessing with the original table qualifier works without alias.
+ checkAnswer(
+ sql(s"SELECT t.index, t2.index FROM $tbl AS t JOIN $tbl2 AS t2 ON t.id
= t2.id2"),
+ Seq(Row(0, 0), Row(0, 0), Row(0, 0))
+ )
+
+ // Accessing non-ambiguous columns through SubqueryAlias works fine
+ checkAnswer(
+ sql(s"SELECT j.data, j.value FROM ($tbl JOIN $tbl2 ON $tbl.id =
$tbl2.id2) AS j"),
+ Seq(Row("a", "x"), Row("b", "y"), Row("c", "z"))
+ )
}
}
@@ -356,4 +388,31 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
assert(cols.head.metadataInJSON() == null)
}
}
+
+ test("SPARK-49110: Project a metadata column while reading a padded char
column") {
+ withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> "true") {
+ withTable(tbl) {
+ sql(s"CREATE TABLE $tbl (id bigint, data char(1)) PARTITIONED BY
(bucket(4, id), id)")
+ sql(s"INSERT INTO $tbl VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ val expected = Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"),
Row(3, "c", 0, "1/3"))
+
+ // Unqualified column access
+ checkAnswer(sql(s"SELECT id, data, index, _partition FROM $tbl"),
expected)
+ checkAnswer(spark.table(tbl).select("id", "data", "index",
"_partition"), expected)
+
+ // Qualified column access without table alias (using full table path)
+ checkAnswer(
+ sql(s"SELECT $tbl.id, $tbl.data, $tbl.index, $tbl._partition FROM
$tbl"),
+ expected)
+
+ // Qualified column access with table alias
+ checkAnswer(
+ sql(s"SELECT t.id, t.data, t.index, t._partition FROM $tbl AS t"),
+ expected)
+ checkAnswer(
+ spark.table(tbl).as("t").select("t.id", "t.data", "t.index",
"t._partition"),
+ expected)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]