This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 91bfbbae6746 [SPARK-53348][SQL][4.0] Always persist ANSI value when creating a view or assume it when querying if not stored 91bfbbae6746 is described below commit 91bfbbae674681589fb6d53c3739ca7d164b627c Author: mihailoale-db <mihailo.alek...@databricks.com> AuthorDate: Wed Aug 27 13:33:04 2025 -0700 [SPARK-53348][SQL][4.0] Always persist ANSI value when creating a view or assume it when querying if not stored ### What changes were proposed in this pull request? I propose that we always store ANSI value when creating a view because otherwise users can be affected by unwanted behavior. For example if user creates a view on version that has ANSI = false by default he expects this not to fail. ``` CREATE VIEW view AS SELECT CAST('abc' AS INT) AS a; SELECT * FROM view; ``` But if user queries the view on the version which has ANSI = true by default, above query is going to fail (because when we don't store the value, and we store it only if explicitly set, we use the default one). Number of this and similar use cases is huge, because ANSI impact area is huge and thus I propose that we always store the value. If the value is not stored, I propose that we use createVersion field to determine whether the ANSI value should be true (Spark 4.0.0 and above) or false (lower than Spark 4.0.0). If the createVersion field wasn't stored during view creation, I propose that we assume that the ANSI = false because number of those views is incomparable larger than the ones expecting ANSI = true ### Why are the changes needed? To improve user experience. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added suite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52147 from mihailoale-db/ansi40backport. Authored-by: mihailoale-db <mihailo.alek...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 24 +++- .../sql/catalyst/analysis/ViewResolution.scala | 8 +- .../sql/catalyst/catalog/SessionCatalog.scala | 15 ++- .../plans/logical/basicLogicalOperators.scala | 12 +- .../org/apache/spark/sql/internal/SQLConf.scala | 9 ++ .../command/CreateUserDefinedFunctionCommand.scala | 19 ++- .../apache/spark/sql/execution/command/views.scala | 15 ++- .../apache/spark/sql/DefaultANSIValueSuite.scala | 136 +++++++++++++++++++++ 8 files changed, 225 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1045e73f2242..8c16cca97d85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -258,7 +258,10 @@ object Analyzer { "spark.sql.expressionTreeChangeLog.level" ) - def retainResolutionConfigsForAnalysis(newConf: SQLConf, existingConf: SQLConf): Unit = { + def retainResolutionConfigsForAnalysis( + newConf: SQLConf, + existingConf: SQLConf, + createSparkVersion: String = ""): Unit = { val retainedConfigs = existingConf.getAllConfs.filter { case (key, _) => // Also apply catalog configs RETAINED_ANALYSIS_FLAGS.contains(key) || key.startsWith("spark.sql.catalog.") @@ -267,6 +270,25 @@ object Analyzer { retainedConfigs.foreach { case (k, v) => newConf.settings.put(k, v) } + + trySetAnsiValue(newConf, createSparkVersion) + } + + /** + * In case ANSI value wasn't persisted for a view or a UDF, we set it to `true` in case Spark + * version used to create the view is 4.0.0 or higher. We set it to `false` in case Spark version + * is lower than 4.0.0 or if the Spark version wasn't stored (in that case we assume that the + * value is `false`) + */ + def trySetAnsiValue(sqlConf: SQLConf, createSparkVersion: String = ""): Unit = { + if (conf.getConf(SQLConf.ASSUME_ANSI_FALSE_IF_NOT_PERSISTED) && + !sqlConf.settings.containsKey(SQLConf.ANSI_ENABLED.key)) { + if (createSparkVersion.startsWith("4.")) { + sqlConf.settings.put(SQLConf.ANSI_ENABLED.key, "true") + } else { + sqlConf.settings.put(SQLConf.ANSI_ENABLED.key, "false") + } + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewResolution.scala index 89ef29ddaaf1..585b8635031e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewResolution.scala @@ -41,7 +41,13 @@ object ViewResolution { view ) } - SQLConf.withExistingConf(View.effectiveSQLConf(view.desc.viewSQLConfigs, view.isTempView)) { + SQLConf.withExistingConf( + View.effectiveSQLConf( + configs = view.desc.viewSQLConfigs, + isTempView = view.isTempView, + createSparkVersion = view.desc.createVersion + ) + ) { resolveChild(view.child) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0e5ff57def89..e34d402ccd36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -982,7 +982,13 @@ class SessionCatalog( objectType = Some("VIEW"), objectName = Some(metadata.qualifiedName) ) - val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) { + val parsedPlan = SQLConf.withExistingConf( + View.effectiveSQLConf( + configs = viewConfigs, + isTempView = isTempView, + createSparkVersion = metadata.createVersion + ) + ) { CurrentOrigin.withOrigin(origin) { parser.parseQuery(viewText) } @@ -1010,7 +1016,11 @@ class SessionCatalog( // Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS // SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same // number of duplications, and pick the corresponding attribute by ordinal. - val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView) + val viewConf = View.effectiveSQLConf( + configs = metadata.viewSQLConfigs, + isTempView = isTempView, + createSparkVersion = metadata.createVersion + ) val normalizeColName: String => String = if (viewConf.caseSensitiveAnalysis) { identity } else { @@ -1617,6 +1627,7 @@ class SessionCatalog( // Use captured SQL configs when parsing a SQL function. val conf = new SQLConf() function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) } + Analyzer.trySetAnsiValue(conf) SQLConf.withExistingConf(conf) { val inputParam = function.inputParam val returnType = function.getScalarFuncReturnType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 3f1cc65a13e1..6fb3bc9e6267 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -835,7 +835,10 @@ case class View( } object View { - def effectiveSQLConf(configs: Map[String, String], isTempView: Boolean): SQLConf = { + def effectiveSQLConf( + configs: Map[String, String], + isTempView: Boolean, + createSparkVersion: String = ""): SQLConf = { val activeConf = SQLConf.get // For temporary view, we always use captured sql configs if (activeConf.useCurrentSQLConfigsForView && !isTempView) return activeConf @@ -844,7 +847,12 @@ object View { for ((k, v) <- configs) { sqlConf.settings.put(k, v) } - Analyzer.retainResolutionConfigsForAnalysis(newConf = sqlConf, existingConf = activeConf) + Analyzer.retainResolutionConfigsForAnalysis( + newConf = sqlConf, + existingConf = activeConf, + createSparkVersion = createSparkVersion + ) + sqlConf } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index da39b24813f9..8427c8320d5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5594,6 +5594,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ASSUME_ANSI_FALSE_IF_NOT_PERSISTED = + buildConf("spark.sql.assumeAnsiFalseIfNotPersisted.enabled") + .internal() + .doc("If enabled, assume ANSI mode is false if not persisted during view or UDF " + + "creation. Otherwise use the default value.") + .version("4.0.1") + .booleanConf + .createWithDefault(true) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CreateUserDefinedFunctionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CreateUserDefinedFunctionCommand.scala index 1ee3c8a4c388..122e7d9b7ef9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CreateUserDefinedFunctionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CreateUserDefinedFunctionCommand.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command import java.util.Locale +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.catalog.{LanguageSQL, RoutineLanguage, UserDefinedFunctionErrors} @@ -87,10 +89,25 @@ object CreateUserDefinedFunctionCommand { * [[org.apache.spark.sql.catalyst.expressions.ExpressionInfo]], all SQL configs and other * function properties (such as the function parameters and the function return type) * are saved together in a property map. + * + * Here we only capture the SQL configs that are modifiable and should be captured, i.e. not in + * the denyList and in the allowList. Besides mentioned ones we also capture `ANSI_ENABLED`. + * + * We need to always capture them to make sure we apply the same configs when querying the + * function. */ def sqlConfigsToProps(conf: SQLConf): Map[String, String] = { val modifiedConfs = ViewHelper.getModifiedConf(conf) - modifiedConfs.map { case (key, value) => s"$SQL_CONFIG_PREFIX$key" -> value } + + val alwaysCaptured = Seq(SQLConf.ANSI_ENABLED) + .filter(c => !modifiedConfs.contains(c.key)) + .map(c => (c.key, conf.getConf(c).toString)) + + val props = new mutable.HashMap[String, String] + for ((key, value) <- modifiedConfs ++ alwaysCaptured) { + props.put(s"$SQL_CONFIG_PREFIX$key", value) + } + props.toMap } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index c73fedc3697e..8ed3509cf726 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -481,16 +481,19 @@ object ViewHelper extends SQLConfHelper with Logging { } /** - * Convert the view SQL configs to `properties`. + * Convert the view SQL configs to `properties`. Here we only capture the SQL configs that are + * modifiable and should be captured, i.e. not in the denyList and in the allowList. We also + * capture `SESSION_LOCAL_TIMEZONE` whose default value relies on the JVM system timezone and + * the `ANSI_ENABLED` value. + * + * We need to always capture them to make sure we apply the same configs when querying the view. */ private def sqlConfigsToProps(conf: SQLConf): Map[String, String] = { val modifiedConfs = getModifiedConf(conf) - // Some configs have dynamic default values, such as SESSION_LOCAL_TIMEZONE whose - // default value relies on the JVM system timezone. We need to always capture them to - // to make sure we apply the same configs when reading the view. - val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE) + + val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE, SQLConf.ANSI_ENABLED) .filter(c => !modifiedConfs.contains(c.key)) - .map(c => (c.key, conf.getConf(c))) + .map(c => (c.key, conf.getConf(c).toString)) val props = new mutable.HashMap[String, String] for ((key, value) <- modifiedConfs ++ alwaysCaptured) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DefaultANSIValueSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DefaultANSIValueSuite.scala new file mode 100644 index 000000000000..da4502462018 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DefaultANSIValueSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.SQLScalarFunction +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SQLFunction} +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, View} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType + +/** + * This suite tests if default ANSI value is persisted for views and functions if not explicitly + * set. + */ +class DefaultANSIValueSuite extends QueryTest with SharedSparkSession { + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)( + implicit pos: Position): Unit = { + if (!sys.env.get("SPARK_ANSI_SQL_MODE").contains("false")) { + super.test(testName, testTags: _*)(testFun) + } + } + + protected override def sparkConf: SparkConf = { + super.sparkConf + .set(SQLConf.ASSUME_ANSI_FALSE_IF_NOT_PERSISTED.key, "true") + } + + private val testViewName = "test_view" + private val testFunctionName = "test_function" + + test("Default ANSI value is stored for views") { + withView(testViewName) { + testView(expectedAnsiValue = true) + } + } + + test("Explicitly set ANSI value is respected over default one for views") { + withView(testViewName) { + withSQLConf("spark.sql.ansi.enabled" -> "false") { + testView(expectedAnsiValue = false) + } + } + + withView(testViewName) { + withSQLConf("spark.sql.ansi.enabled" -> "true") { + testView(expectedAnsiValue = true) + } + } + } + + test("Default ANSI value is stored for functions") { + withUserDefinedFunction(testFunctionName -> false) { + testFunction(expectedAnsiValue = true) + } + } + + test("Explicitly set ANSI value is respected over default one for functions") { + withUserDefinedFunction(testFunctionName -> false) { + withSQLConf("spark.sql.ansi.enabled" -> "false") { + testFunction(expectedAnsiValue = false) + } + } + + withUserDefinedFunction(testFunctionName -> false) { + withSQLConf("spark.sql.ansi.enabled" -> "true") { + testFunction(expectedAnsiValue = true) + } + } + } + + test("ANSI value is set to false if not persisted for views") { + val catalogTable = new CatalogTable( + identifier = TableIdentifier(testViewName), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + schema = new StructType(), + properties = Map.empty[String, String] + ) + val view = View(desc = catalogTable, isTempView = false, child = OneRowRelation()) + + val sqlConf = View.effectiveSQLConf(view.desc.viewSQLConfigs, view.isTempView) + + assert(sqlConf.settings.get("spark.sql.ansi.enabled") == "false") + } + + private def testView(expectedAnsiValue: Boolean): Unit = { + sql(s"CREATE VIEW $testViewName AS SELECT CAST('string' AS BIGINT) AS alias") + + val viewMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testViewName)) + + assert( + viewMetadata.properties("view.sqlConfig.spark.sql.ansi.enabled") == expectedAnsiValue.toString + ) + } + + private def testFunction(expectedAnsiValue: Boolean): Unit = { + sql( + s""" + |CREATE OR REPLACE FUNCTION $testFunctionName() + |RETURN SELECT CAST('string' AS BIGINT) AS alias + |""".stripMargin) + + val df = sql(s"select $testFunctionName()") + + assert( + df.queryExecution.analyzed.asInstanceOf[Project] + .projectList.head.asInstanceOf[Alias] + .child.asInstanceOf[SQLScalarFunction] + .function.asInstanceOf[SQLFunction] + .properties.get("sqlConfig.spark.sql.ansi.enabled").get == expectedAnsiValue.toString + ) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org