This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 1c99423d81fd [SPARK-54735][SQL][REVERT] Revert column comments
preservation in view with SCHEMA EVOLUTION
1c99423d81fd is described below
commit 1c99423d81fdbfa443fcb0a9521611563c713bb0
Author: Milan Dankovic <[email protected]>
AuthorDate: Wed May 13 20:19:35 2026 +0800
[SPARK-54735][SQL][REVERT] Revert column comments preservation in view with
SCHEMA EVOLUTION
This reverts commit e886428cdfd3f59232bc037e8665b70e75863d25.
### What changes were proposed in this pull request?
This PR reverts the previous change that prevented user-defined comments in
schema evolution views from being overwritten by comments from the underlying
table.
### Why are the changes needed?
The original change introduced a behavioral change in how comments are
handled for views with SCHEMA EVOLUTION enabled. After further discussion, we
concluded that some users may already depend on the existing behavior where
comments are refreshed from the underlying table during schema evolution.
Because this can be considered a breaking change, and given the proximity
to the release cut, we decided to revert the original PR for now to preserve
backward compatibility and avoid potential regressions for existing workloads.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #55849 from miland-db/milan-dankovic_data/revert-SPARK-54735.
Authored-by: Milan Dankovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 965056eb99a90f9b52f701e6615f7df53ad01be5)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 14 --
.../spark/sql/execution/datasources/rules.scala | 46 +-----
.../results/view-schema-evolution.sql.out | 4 +-
.../spark/sql/execution/SQLViewTestSuite.scala | 159 +--------------------
.../SparkConfigBindingPolicySuite.scala | 2 +-
5 files changed, 8 insertions(+), 217 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9bb5b98cfabf..fcb736e40485 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2319,17 +2319,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS =
- buildConf("spark.sql.view.schemaEvolution.preserveUserComments")
- .internal()
- .doc("When enabled, views with SCHEMA EVOLUTION mode will preserve
user-set view comments " +
- "when the underlying table schema evolves. When disabled, view
comments will be " +
- "overwritten with table comments on every schema sync.")
- .version("4.2.0")
- .withBindingPolicy(ConfigBindingPolicy.SESSION)
- .booleanConf
- .createWithDefault(true)
-
val OUTPUT_COMMITTER_CLASS =
buildConf("spark.sql.sources.outputCommitterClass")
.version("1.4.0")
.internal()
@@ -8056,9 +8045,6 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def viewSchemaCompensation: Boolean = getConf(VIEW_SCHEMA_COMPENSATION)
- def viewSchemaEvolutionPreserveUserComments: Boolean =
- getConf(VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS)
-
def defaultCacheStorageLevel: StorageLevel =
StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL).name())
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ff6c1e067406..ce41bbe4aeb3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -40,7 +40,7 @@ import
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType,
MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField,
StructType}
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.ArrayImplicits._
@@ -737,19 +737,6 @@ case class QualifyLocationWithWarehouse(catalog:
SessionCatalog) extends Rule[Lo
* It does so by walking the resolved plan looking for View operators for
persisted views.
*/
object ViewSyncSchemaToMetaStore extends (LogicalPlan => Unit) {
-
- /**
- * Checks if comment changes between view and table should trigger schema
sync.
- * When preserveUserComments flag is enabled, comment differences should NOT
trigger sync
- * because we want to preserve user-set view comments.
- */
- private def shouldTriggerRedoOnCommentChange(
- viewField: StructField,
- tableField: StructField,
- preserveUserComments: Boolean): Boolean = {
- !preserveUserComments && viewField.getComment() != tableField.getComment()
- }
-
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case View(metaData, false, viewQuery, _)
@@ -768,44 +755,19 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan =>
Unit) {
(field.dataType != planField.dataType ||
field.nullable != planField.nullable ||
(viewSchemaMode == SchemaEvolution && (
- field.name != planField.name ||
- shouldTriggerRedoOnCommentChange(
- field,
- planField,
-
session.sessionState.conf.viewSchemaEvolutionPreserveUserComments))))
+ field.getComment() != planField.getComment() ||
+ field.name != planField.name)))
}
- lazy val viewFieldsByName = viewFields.map(f => f.name -> f).toMap
-
if (redo) {
val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
val newFields = viewQuery.schema.map {
case StructField(name, dataType, nullable, _) =>
StructField(name, dataType, nullable,
- viewFieldsByName(name).metadata)
- }
- StructType(newFields)
- } else if
(session.sessionState.conf.viewSchemaEvolutionPreserveUserComments) {
- // Adopt types/nullable/names from query, but preserve view
comments.
- val newFields = viewQuery.schema.map { planField =>
- val newMetadata = viewFieldsByName.get(planField.name) match {
- case Some(viewField) =>
- // Use table metadata but override with view comment
- val builder = new
MetadataBuilder().withMetadata(planField.metadata)
- viewField.getComment() match {
- case Some(comment) => builder.putString("comment", comment)
- case None => builder.remove("comment")
- }
- builder.build()
- case None =>
- // New column, use table metadata as-is
- planField.metadata
- }
- StructField(planField.name, planField.dataType,
planField.nullable, newMetadata)
+ viewFields.find(_.name == name).get.metadata)
}
StructType(newFields)
} else {
- // Legacy behavior: adopt everything from table including comments.
viewQuery.schema
}
SchemaUtils.checkColumnNameDuplication(fieldNames.toImmutableArraySeq,
diff --git
a/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
b/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
index 497e307f592f..7410e7eaafd6 100644
---
a/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
+++
b/sql/core/src/test/resources/sql-tests/results/view-schema-evolution.sql.out
@@ -897,8 +897,8 @@ DESCRIBE EXTENDED v
-- !query schema
struct<col_name:string,data_type:string,comment:string>
-- !query output
-c1 bigint c1 6d
-c2 string c2 6d
+c1 bigint c1 6e
+c2 string c2 6e
# Detailed Table Information
Catalog spark_catalog
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index f55d4b8cb9e6..1e5b1cdfaeee 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
/**
@@ -881,163 +881,6 @@ class PersistedViewTestSuite extends SQLViewTestSuite
with SharedSparkSession {
}
}
- test("Schema evolution views should preserve manually set comments") {
- withTable("t") {
- withView("v") {
- // Create table with comments.
- sql("CREATE TABLE t (c1 INT COMMENT " +
- "'table comment 1', c2 STRING COMMENT 'table comment 2')")
- sql("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')")
-
- // Create view with schema evolution (no column list) - initially
adopts table comments.
- sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
-
- // Verify initial comments from table are adopted.
- val descInitial = sql("DESCRIBE EXTENDED v").collect()
- val c1CommentInitial = descInitial.filter(r => r.getString(0) == "c1")
- val c2CommentInitial = descInitial.filter(r => r.getString(0) == "c2")
- assert(c1CommentInitial.nonEmpty && c1CommentInitial(0).getString(2)
== "table comment 1",
- "Initial c1 comment should be 'table comment 1' from table")
- assert(c2CommentInitial.nonEmpty && c2CommentInitial(0).getString(2)
== "table comment 2",
- "Initial c2 comment should be 'table comment 2' from table")
-
- // Simulate user manually changing view comments (via UI or ALTER
COLUMN).
- val catalog = spark.sessionState.catalog
- val viewMeta = catalog.getTableMetadata(TableIdentifier("v"))
- val newSchema = StructType(Seq(
- StructField("c1", IntegerType, nullable = true).withComment("user
comment 1"),
- StructField("c2", StringType, nullable = true).withComment("user
comment 2")
- ))
- catalog.alterTable(viewMeta.copy(schema = newSchema))
-
- // Verify manually set comments.
- val descManual = sql("DESCRIBE EXTENDED v").collect()
- val c1CommentManual = descManual.filter(r => r.getString(0) == "c1")
- val c2CommentManual = descManual.filter(r => r.getString(0) == "c2")
- assert(c1CommentManual.nonEmpty && c1CommentManual(0).getString(2) ==
"user comment 1",
- "c1 comment should be 'user comment 1'")
- assert(c2CommentManual.nonEmpty && c2CommentManual(0).getString(2) ==
"user comment 2",
- "c2 comment should be 'user comment 2'")
-
- // SELECT from view (triggers ViewSyncSchemaToMetaStore).
- checkAnswer(sql("SELECT * FROM v"), Seq(Row(1, "a"), Row(2, "b"),
Row(3, "c")))
-
- // Verify manually set comments are PRESERVED (not reverted to table
comments).
- val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
- val c1CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c1")
- val c2CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c2")
- assert(c1CommentAfter.nonEmpty && c1CommentAfter(0).getString(2) ==
"user comment 1",
- "c1 comment should still be 'user comment 1' after SELECT (bug: was
reverted)")
- assert(c2CommentAfter.nonEmpty && c2CommentAfter(0).getString(2) ==
"user comment 2",
- "c2 comment should still be 'user comment 2' after SELECT (bug: was
reverted)")
-
- // Verify that type changes are still adopted.
- sql("DROP TABLE t")
- sql("CREATE TABLE t (c1 BIGINT COMMENT 'table comment changed', " +
- "c2 DOUBLE COMMENT 'table comment changed 2')")
- sql("INSERT INTO t VALUES (4, 5.0), (6, 7.0)")
-
- // SELECT from view - should adopt new types but preserve view
comments.
- checkAnswer(sql("SELECT * FROM v"), Seq(Row(4, 5.0), Row(6, 7.0)))
-
- // Verify types changed but comments preserved.
- val descAfterTypeChange = sql("DESCRIBE EXTENDED v").collect()
- val c1Final = descAfterTypeChange.filter(r => r.getString(0) == "c1")
- val c2Final = descAfterTypeChange.filter(r => r.getString(0) == "c2")
- assert(c1Final.nonEmpty && c1Final(0).getString(1) == "bigint",
- "c1 type should be updated to bigint")
- assert(c2Final.nonEmpty && c2Final(0).getString(1) == "double",
- "c2 type should be updated to double")
- assert(c1Final.nonEmpty && c1Final(0).getString(2) == "user comment 1",
- "c1 comment should still be 'user comment 1' (preserved)")
- assert(c2Final.nonEmpty && c2Final(0).getString(2) == "user comment 2",
- "c2 comment should still be 'user comment 2' (preserved)")
- }
- }
- }
-
- test("Schema evolution comments legacy behavior with
preserveUserComments=false") {
- withSQLConf(VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS.key -> "false") {
- withTable("t") {
- withView("v") {
- // Create table with comments.
- sql("CREATE TABLE t (c1 INT COMMENT " +
- "'table comment 1', c2 STRING COMMENT 'table comment 2')")
- sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
-
- // Create view with schema evolution.
- sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
-
- // User manually changes view comments.
- val catalog = spark.sessionState.catalog
- val viewMeta = catalog.getTableMetadata(TableIdentifier("v"))
- val newSchema = StructType(Seq(
- StructField("c1", IntegerType, nullable = true).withComment("user
comment 1"),
- StructField("c2", StringType, nullable = true).withComment("user
comment 2")
- ))
- catalog.alterTable(viewMeta.copy(schema = newSchema))
-
- // Verify manually set comments.
- val descManual = sql("DESCRIBE EXTENDED v").collect()
- val c1CommentManual = descManual.filter(r => r.getString(0) == "c1")
- val c2CommentManual = descManual.filter(r => r.getString(0) == "c2")
- assert(c1CommentManual.nonEmpty && c1CommentManual(0).getString(2)
== "user comment 1")
- assert(c2CommentManual.nonEmpty && c2CommentManual(0).getString(2)
== "user comment 2")
-
- // SELECT from view (triggers ViewSyncSchemaToMetaStore).
- checkAnswer(sql("SELECT * FROM v"), Seq(Row(1, "a"), Row(2, "b")))
-
- // With flag=false, comments should REVERT to table comments (legacy
behavior).
- val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
- val c1CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c1")
- val c2CommentAfter = descAfterSelect.filter(r => r.getString(0) ==
"c2")
- assert(c1CommentAfter.nonEmpty && c1CommentAfter(0).getString(2) ==
"table comment 1",
- "c1 comment should revert to 'table comment 1' (legacy behavior)")
- assert(c2CommentAfter.nonEmpty && c2CommentAfter(0).getString(2) ==
"table comment 2",
- "c2 comment should revert to 'table comment 2' (legacy behavior)")
- }
- }
- }
- }
-
- test("Comments are preserved when table comment changes with
preserveUserComments=true") {
- withTable("t") {
- withView("v") {
- // Create table with initial comment.
- sql("CREATE TABLE t (c1 INT COMMENT 'original table comment')")
- sql("INSERT INTO t VALUES (1), (2)")
-
- // Create view with schema evolution - inherits table comment.
- sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * FROM t")
-
- // Verify view has inherited the table comment.
- val descInitial = sql("DESCRIBE EXTENDED v").collect()
- val c1Initial = descInitial.filter(r => r.getString(0) == "c1")
- assert(c1Initial.nonEmpty && c1Initial(0).getString(2) == "original
table comment",
- "View should inherit table comment")
-
- // Change the table comment.
- sql("ALTER TABLE t CHANGE COLUMN c1 c1 INT COMMENT 'new table
comment'")
-
- // Verify table comment changed.
- val descTable = sql("DESCRIBE EXTENDED t").collect()
- val c1Table = descTable.filter(r => r.getString(0) == "c1")
- assert(c1Table.nonEmpty && c1Table(0).getString(2) == "new table
comment",
- "Table comment should be updated")
-
- // SELECT from view (triggers ViewSyncSchemaToMetaStore).
- checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
-
- // Verify view still has the original inherited comment (frozen).
- val descAfterSelect = sql("DESCRIBE EXTENDED v").collect()
- val c1AfterSelect = descAfterSelect.filter(r => r.getString(0) == "c1")
- assert(c1AfterSelect.nonEmpty &&
- c1AfterSelect(0).getString(2) == "original table comment",
- "View should preserve inherited comment even when table comment
changes")
- }
- }
- }
-
def getShowCreateDDL(view: String, serde: Boolean = false): String = {
val result = if (serde) {
sql(s"SHOW CREATE TABLE $view AS SERDE")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/configaudit/SparkConfigBindingPolicySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/configaudit/SparkConfigBindingPolicySuite.scala
index 7b04db0788bd..e7b4669e2b96 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/configaudit/SparkConfigBindingPolicySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/configaudit/SparkConfigBindingPolicySuite.scala
@@ -36,7 +36,7 @@ class SparkConfigBindingPolicySuite extends SparkFunSuite {
test("Test adding bindingPolicy to config") {
val allConfigs = SQLConf.getConfigEntries().asScala.filter { entry =>
- entry.key == SQLConf.VIEW_SCHEMA_EVOLUTION_PRESERVE_USER_COMMENTS.key
+ entry.key == SQLConf.PLAN_CHANGE_LOG_LEVEL.key
}
assert(allConfigs.head.bindingPolicy.isDefined)
assert(allConfigs.head.bindingPolicy.get == ConfigBindingPolicy.SESSION)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]