This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 2d2a0d95a69f [SPARK-53527][SQL] Improve fallback of analyzeExistenceDefaultValue 2d2a0d95a69f is described below commit 2d2a0d95a69f97a75428908aa881625ebb576ffd Author: Szehon Ho <szehon.apa...@gmail.com> AuthorDate: Tue Sep 9 11:26:16 2025 +0800 [SPARK-53527][SQL] Improve fallback of analyzeExistenceDefaultValue https://github.com/apache/spark/pull/49962 added a fallback in case there were already broken (ie, non-resolved) persisted default values in catalogs. A broken one is something like 'current_database, current_user, current_timestamp' , these are non-deterministic and will bring wrong results in EXISTS_DEFAULT, where user expects the value resolved when they set the default. Add yet another fallback for broken default default value, in this case one where there are nested function calls. Take the case where the EXISTS_DEFAULT is : ```CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))``` the current code `Literal.fromSQL(defaultSQL)` will throw the exception before getting to the fallback: ``` Caused by: java.lang.AssertionError: assertion failed: function arguments must be resolved. at scala.Predef$.assert(Predef.scala:279) at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.$anonfun$expressionBuilder$1(FunctionRegistry.scala:1278) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction(FunctionRegistry.scala:251) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction$(FunctionRegistry.scala:245) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:317) at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:325) at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:317) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:586) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:121) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:586) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:579) at scala.collection.immutable.List.map(List.scala:251) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:768) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:579) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:556) at org.apache.spark.sql.catalyst.expressions.Literal$.fromSQL(literals.scala:317) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyzeExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:393) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:529) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:524) at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:524) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:594) at scala.Option.getOrElse(Option.scala:201) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:592) ``` No Add unit test in StructTypeSuite No Closes #52274 from szehon-ho/more_default_value_fallback. Authored-by: Szehon Ho <szehon.apa...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 2f305b6817bca87c975a470f5801c3cd3742c04b) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 41 +++++++++++++--------- .../apache/spark/sql/types/StructTypeSuite.scala | 11 ++++++ 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index b24ad30e0719..72a8c8539bd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.util import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.internal.{Logging, MDC} @@ -368,27 +369,33 @@ object ResolveDefaultColumns extends QueryErrorsBase val defaultSQL = field.metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY) // Parse the expression. - val expr = Literal.fromSQL(defaultSQL) match { - // EXISTS_DEFAULT will have a cast from analyze() due to coerceDefaultValue - // hence we need to add timezone to the cast if necessary - case c: Cast if c.child.resolved && c.needsTimeZone => - c.withTimeZone(SQLConf.get.sessionLocalTimeZone) - case e: Expression => e - } + val resolvedExpr = Try(Literal.fromSQL(defaultSQL)) match { + case Success(literal) => + val expr = literal match { + // EXISTS_DEFAULT will have a cast from analyze() due to coerceDefaultValue + // hence we need to add timezone to the cast if necessary + case c: Cast if c.child.resolved && c.needsTimeZone => + c.withTimeZone(SQLConf.get.sessionLocalTimeZone) + case e: Expression => e + } - // Check invariants - if (expr.containsPattern(PLAN_EXPRESSION)) { - throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions( - "", field.name, defaultSQL) - } + // Check invariants + if (expr.containsPattern(PLAN_EXPRESSION)) { + throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions( + "", field.name, defaultSQL) + } + + expr match { + case _: ExprLiteral => expr + case c: Cast if c.resolved => expr + case _ => + fallbackResolveExistenceDefaultValue(field) + } - val resolvedExpr = expr match { - case _: ExprLiteral => expr - case c: Cast if c.resolved => expr - case _ => + case Failure(_) => + // If Literal.fromSQL fails, use fallback resolution fallbackResolveExistenceDefaultValue(field) } - coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala index 5dd45d3d4496..42579f6cc6ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala @@ -856,6 +856,13 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { "CAST(CURRENT_TIMESTAMP AS BIGINT)") .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "CAST(CURRENT_TIMESTAMP AS BIGINT)") + .build()), + StructField("c3", StringType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, + "CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, + "CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))") .build()))) val res = ResolveDefaultColumns.existenceDefaultValues(source) assert(res(0) == null) @@ -864,5 +871,9 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { val res2Wrapper = new LongWrapper assert(res(2).asInstanceOf[UTF8String].toLong(res2Wrapper)) assert(res2Wrapper.value > 0) + + val res3Wrapper = new LongWrapper + assert(res(3).asInstanceOf[UTF8String].toLong(res3Wrapper)) + assert(res3Wrapper.value > 0) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org