This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b7afa7afdb8 [SPARK-46958][SQL] Add missing timezone to coerce default 
values
1b7afa7afdb8 is described below

commit 1b7afa7afdb83c2b59158904659a232b6e85ca82
Author: Kent Yao <[email protected]>
AuthorDate: Tue Feb 6 12:47:59 2024 +0800

    [SPARK-46958][SQL] Add missing timezone to coerce default values
    
    ### What changes were proposed in this pull request?
    
    Add missing timezone for `Cast`s in ResolveDefaultColumn
    - For the canUpcast branch, if the timezone is missing, INTERNAL_ERROR will 
be reported after the DDL succeeds cuz' the Cast is `unresolved.`
    ```java
    [info] - SPARK-46958: Fix bug when canUpcast and result non-foldable 
expression *** FAILED *** (36 milliseconds)
    [info]   org.apache.spark.SparkException: [INTERNAL_ERROR] Found the 
unresolved Cast: cast(2018-11-17 00:00:00 as string) SQLSTATE: XX000
    [info] == SQL (line 1, position 1) ==
    [info] INSERT INTO t (key) VALUES(1)
    [info] ^^^^^^^^^^^^^^^^^^^
    ```
    - For the `else` branch, where we cast a wider type to a narrow one for 
convenience when the value is fittable, it fails the DDL directly if the 
timezone is missing.
    
    ```java
    [info] - SPARK-46958: timestamp should have timezone for resolvable when 
default values fit *** FAILED *** (5 milliseconds)
    [info]   org.apache.spark.sql.AnalysisException: 
[INVALID_DEFAULT_VALUE.DATA_TYPE] Failed to execute CREATE TABLE command 
because the destination column or variable `c` has a DEFAULT value 
'2018-11-17', which requires "TIMESTAMP" type, but the statement provided a 
value of incompatible "STRING" type. SQLSTATE: 42623
    ```
    ### Why are the changes needed?
    
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45000 from yaooqinn/SPARK-46958.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 49 +++++++++++-----------
 .../spark/sql/ResolveDefaultColumnsSuite.scala     | 27 +++++++++++-
 2 files changed, 50 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index a2bfc6e08da8..3735f1ce8e63 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.util
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis._
@@ -44,7 +45,8 @@ import org.apache.spark.util.ArrayImplicits._
  */
 object ResolveDefaultColumns extends QueryErrorsBase
   with ResolveDefaultColumnsUtils
-  with SQLConfHelper {
+  with SQLConfHelper
+  with Logging {
   // Name of attributes representing explicit references to the value stored 
in the above
   // CURRENT_DEFAULT_COLUMN_METADATA.
   val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
@@ -311,36 +313,35 @@ object ResolveDefaultColumns extends QueryErrorsBase
       defaultSQL: String): Expression = {
     val supplanted = CharVarcharUtils.replaceCharVarcharWithString(dataType)
     // Perform implicit coercion from the provided expression type to the 
required column type.
-    val ret = if (supplanted == analyzed.dataType) {
-      analyzed
-    } else if (Cast.canUpCast(analyzed.dataType, supplanted)) {
-      Cast(analyzed, supplanted)
-    } else {
-      // If the provided default value is a literal of a wider type than the 
target column, but the
-      // literal value fits within the narrower type, just coerce it for 
convenience. Exclude
-      // boolean/array/struct/map types from consideration for this type 
coercion to avoid
-      // surprising behavior like interpreting "false" as integer zero.
-      val result = if (analyzed.isInstanceOf[Literal] &&
-        !Seq(supplanted, analyzed.dataType).exists(_ match {
+    val ret = analyzed match {
+      case equivalent if equivalent.dataType == supplanted =>
+        equivalent
+      case canUpCast if Cast.canUpCast(canUpCast.dataType, supplanted) =>
+        Cast(analyzed, supplanted, Some(conf.sessionLocalTimeZone))
+      case l: Literal
+        if !Seq(supplanted, l.dataType).exists(_ match {
           case _: BooleanType | _: ArrayType | _: StructType | _: MapType => 
true
           case _ => false
-        })) {
-        try {
-          val casted = Cast(analyzed, supplanted, evalMode = 
EvalMode.TRY).eval()
-          if (casted != null) {
-            Some(Literal(casted, supplanted))
-          } else {
-            None
-          }
+        }) =>
+        // If the provided default value is a literal of a wider type than the 
target column,
+        // but the literal value fits within the narrower type, just coerce it 
for convenience.
+        // Exclude boolean/array/struct/map types from consideration for this 
type coercion to
+        // avoid surprising behavior like interpreting "false" as integer zero.
+        val casted = Cast(l, supplanted, Some(conf.sessionLocalTimeZone), 
evalMode = EvalMode.TRY)
+        val result = try {
+          Option(casted.eval(EmptyRow)).map(Literal(_, supplanted))
         } catch {
-          case _: SparkThrowable | _: RuntimeException =>
+          case e @ ( _: SparkThrowable | _: RuntimeException) =>
+            logWarning(s"Failed to cast default value '$l' for column $colName 
from " +
+              s"${l.dataType} to $supplanted due to ${e.getMessage}")
             None
         }
-      } else None
-      result.getOrElse {
+        result.getOrElse(
+          throw QueryCompilationErrors.defaultValuesDataTypeError(
+            statementType, colName, defaultSQL, dataType, l.dataType))
+      case _ =>
         throw QueryCompilationErrors.defaultValuesDataTypeError(
           statementType, colName, defaultSQL, dataType, analyzed.dataType)
-      }
     }
     if (!conf.charVarcharAsString && 
CharVarcharUtils.hasCharVarchar(dataType)) {
       CharVarcharUtils.stringLengthCheck(ret, dataType).eval(EmptyRow)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala
index 0b393f4b11b4..48a9564ab8f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala
@@ -182,14 +182,14 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
             "actualType" -> "\"STRING\""))
         checkError(
           exception = intercept[AnalysisException] {
-            sql("create table demos.test_ts_other (a timestamp default 
'2022-01-02') using parquet")
+            sql("create table demos.test_ts_other (a timestamp default 
'invalid') using parquet")
           },
           errorClass = "INVALID_DEFAULT_VALUE.DATA_TYPE",
           parameters = Map(
             "statement" -> "CREATE TABLE",
             "colName" -> "`a`",
             "expectedType" -> "\"TIMESTAMP\"",
-            "defaultValue" -> "'2022-01-02'",
+            "defaultValue" -> "'invalid'",
             "actualType" -> "\"STRING\""))
         checkError(
           exception = intercept[AnalysisException] {
@@ -256,4 +256,27 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
       checkAnswer(sql("select * from t"), Row(1, "apache", "spark "))
     }
   }
+
+  test("SPARK-46958: timestamp should have timezone for resolvable if default 
values castable") {
+    val defaults = Seq("timestamp '2018-11-17'", "CAST(timestamp '2018-11-17' 
AS STRING)")
+    defaults.foreach { default =>
+      withTable("t") {
+        sql(s"CREATE TABLE t(key int, c STRING DEFAULT $default) " +
+          s"USING parquet")
+        sql("INSERT INTO t (key) VALUES(1)")
+        checkAnswer(sql("select * from t"), Row(1, "2018-11-17 00:00:00"))
+      }
+    }
+  }
+
+  test("SPARK-46958: timestamp should have timezone for resolvable when 
default values fit") {
+    withTable("t") {
+      // If the provided default value is a literal of a wider type than the 
target column, but
+      // the literal value fits within the narrower type, just coerce it for 
convenience.
+      sql(s"CREATE TABLE t(key int, c timestamp DEFAULT '2018-11-17 13:33:33') 
" +
+        s"USING parquet")
+      sql("INSERT INTO t (key) VALUES(1)")
+      checkAnswer(sql("select CAST(c as STRING) from t"), Row("2018-11-17 
13:33:33"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to