This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 2d2a0d95a69f [SPARK-53527][SQL] Improve fallback of 
analyzeExistenceDefaultValue
2d2a0d95a69f is described below

commit 2d2a0d95a69f97a75428908aa881625ebb576ffd
Author: Szehon Ho <szehon.apa...@gmail.com>
AuthorDate: Tue Sep 9 11:26:16 2025 +0800

    [SPARK-53527][SQL] Improve fallback of analyzeExistenceDefaultValue
    
    https://github.com/apache/spark/pull/49962 added a fallback in case there 
were already broken (ie, non-resolved) persisted default values in catalogs. A 
broken one is something like 'current_database, current_user, 
current_timestamp' , these are non-deterministic and will bring wrong results 
in EXISTS_DEFAULT, where user expects the value resolved when they set the 
default.
    
    Add yet another fallback for broken default default value, in this case one 
where there are nested function calls.
    
    Take the case where the EXISTS_DEFAULT is :
    ```CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))```
    
    the current code `Literal.fromSQL(defaultSQL)` will throw the exception 
before getting to the fallback:
    ```
    Caused by: java.lang.AssertionError: assertion failed: function arguments 
must be resolved.
            at scala.Predef$.assert(Predef.scala:279)
            at 
org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.$anonfun$expressionBuilder$1(FunctionRegistry.scala:1278)
            at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction(FunctionRegistry.scala:251)
            at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction$(FunctionRegistry.scala:245)
            at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:317)
            at 
org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:325)
            at 
org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:317)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:586)
            at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:121)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:586)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:579)
            at scala.collection.immutable.List.map(List.scala:251)
            at scala.collection.immutable.List.map(List.scala:79)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:768)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:579)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:556)
            at 
org.apache.spark.sql.catalyst.expressions.Literal$.fromSQL(literals.scala:317)
            at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyzeExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:393)
            at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:529)
            at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:524)
            at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
            at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:524)
            at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:594)
            at scala.Option.getOrElse(Option.scala:201)
            at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:592)
    ```
    
    No
    
    Add unit test in StructTypeSuite
    
    No
    
    Closes #52274 from szehon-ho/more_default_value_fallback.
    
    Authored-by: Szehon Ho <szehon.apa...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 2f305b6817bca87c975a470f5801c3cd3742c04b)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 41 +++++++++++++---------
 .../apache/spark/sql/types/StructTypeSuite.scala   | 11 ++++++
 2 files changed, 35 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index b24ad30e0719..72a8c8539bd8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.util
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.{SparkException, SparkThrowable, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.{Logging, MDC}
@@ -368,27 +369,33 @@ object ResolveDefaultColumns extends QueryErrorsBase
     val defaultSQL = 
field.metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
 
     // Parse the expression.
-    val expr = Literal.fromSQL(defaultSQL) match {
-      // EXISTS_DEFAULT will have a cast from analyze() due to 
coerceDefaultValue
-      // hence we need to add timezone to the cast if necessary
-      case c: Cast if c.child.resolved && c.needsTimeZone =>
-        c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
-      case e: Expression => e
-    }
+    val resolvedExpr = Try(Literal.fromSQL(defaultSQL)) match {
+      case Success(literal) =>
+        val expr = literal match {
+          // EXISTS_DEFAULT will have a cast from analyze() due to 
coerceDefaultValue
+          // hence we need to add timezone to the cast if necessary
+          case c: Cast if c.child.resolved && c.needsTimeZone =>
+            c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
+          case e: Expression => e
+        }
 
-    // Check invariants
-    if (expr.containsPattern(PLAN_EXPRESSION)) {
-      throw 
QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
-        "", field.name, defaultSQL)
-    }
+        // Check invariants
+        if (expr.containsPattern(PLAN_EXPRESSION)) {
+          throw 
QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
+            "", field.name, defaultSQL)
+        }
+
+        expr match {
+          case _: ExprLiteral => expr
+          case c: Cast if c.resolved => expr
+          case _ =>
+            fallbackResolveExistenceDefaultValue(field)
+        }
 
-    val resolvedExpr = expr match {
-      case _: ExprLiteral => expr
-      case c: Cast if c.resolved => expr
-      case _ =>
+      case Failure(_) =>
+        // If Literal.fromSQL fails, use fallback resolution
         fallbackResolveExistenceDefaultValue(field)
     }
-
     coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, 
defaultSQL)
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index 5dd45d3d4496..42579f6cc6ee 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -856,6 +856,13 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper 
{
               "CAST(CURRENT_TIMESTAMP AS BIGINT)")
             
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
               "CAST(CURRENT_TIMESTAMP AS BIGINT)")
+            .build()),
+        StructField("c3", StringType, true,
+          new MetadataBuilder()
+            
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
+              "CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, 
'0'))")
+            
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+              "CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, 
'0'))")
             .build())))
     val res = ResolveDefaultColumns.existenceDefaultValues(source)
     assert(res(0) == null)
@@ -864,5 +871,9 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
     val res2Wrapper = new LongWrapper
     assert(res(2).asInstanceOf[UTF8String].toLong(res2Wrapper))
     assert(res2Wrapper.value > 0)
+
+    val res3Wrapper = new LongWrapper
+    assert(res(3).asInstanceOf[UTF8String].toLong(res3Wrapper))
+    assert(res3Wrapper.value > 0)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to