This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new c19bf01b5208 [SPARK-46769][SQL] Refine timestamp related schema 
inference
c19bf01b5208 is described below

commit c19bf01b5208bb3aad0e6264b64597e0809b1efe
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Sat Jan 20 20:57:09 2024 +0800

    [SPARK-46769][SQL] Refine timestamp related schema inference
    
    This is a refinement of https://github.com/apache/spark/pull/43243 . This 
PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and 
only infer LTZ type using LTZ parser. This consistency is important to avoid 
nondeterministic behaviors.
    
    Avoid non-deterministic behaviors. After 
https://github.com/apache/spark/pull/43243 , we can still have inconsistency if 
the LEGACY mode is enabled.
    
    Yes for the legacy parser. Now it's more likely to infer string type 
instead of inferring timestamp type "by luck"
    
    existing tests
    
    no
    
    Closes https://github.com/apache/spark/pull/44789
    
    Closes #44800 from cloud-fan/infer.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit e4e40762ca41931646b8f201028b1f2298252d96)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    | 18 +++++-----
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 32 +++++++++++++----
 .../sql/execution/datasources/csv/CSVSuite.scala   | 42 +++++++++++-----------
 3 files changed, 55 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index ec01b56f9eb7..2c27da3cf6e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
     "yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", 
"yyyy")
 
+  private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+
   /**
    * Similar to the JSON schema inference
    *     1. Infer type of each row
@@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   }
 
   private def tryParseTimestampNTZ(field: String): DataType = {
-    // We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
-    // time-zone component and can be parsed with the timestamp formatter.
-    // Otherwise, it is likely to be a timestamp with timezone.
-    val timestampType = SQLConf.get.timestampType
-    if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
-        timestampType == TimestampNTZType) &&
-        timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-      timestampType
+    // For text-based format, it's ambiguous to infer a timestamp string 
without timezone, as it can
+    // be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new 
support of NTZ, here
+    // we only try to infer NTZ if the config is set to use NTZ by default.
+    if (isDefaultNTZ &&
+      timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
+      TimestampNTZType
     } else {
       tryParseTimestamp(field)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 4123c5290b6a..f6d32f39f64e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
@@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
     isParsing = true,
     forTimestampNTZ = true)
 
+  private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+  private val legacyMode = SQLConf.get.legacyTimeParserPolicy == 
LegacyBehaviorPolicy.LEGACY
+
   private def handleJsonErrorsByParseMode(parseMode: ParseMode,
       columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
     parseMode match {
@@ -148,16 +152,30 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
           val bigDecimal = decimalParser(field)
             DecimalType(bigDecimal.precision, bigDecimal.scale)
         }
-        val timestampType = SQLConf.get.timestampType
         if (options.prefersDecimal && decimalTry.isDefined) {
           decimalTry.get
-        } else if (options.inferTimestamp && 
(SQLConf.get.legacyTimeParserPolicy ==
-          LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
+        } else if (options.inferTimestamp) {
+          // For text-based format, it's ambiguous to infer a timestamp string 
without timezone, as
+          // it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes 
with the new support
+          // of NTZ, here we only try to infer NTZ if the config is set to use 
NTZ by default.
+          if (isDefaultNTZ &&
             timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-          timestampType
-        } else if (options.inferTimestamp &&
-            timestampFormatter.parseOptional(field).isDefined) {
-          TimestampType
+            TimestampNTZType
+          } else if (timestampFormatter.parseOptional(field).isDefined) {
+            TimestampType
+          } else if (legacyMode) {
+            val utf8Value = UTF8String.fromString(field)
+            // There was a mistake that we use TIMESTAMP NTZ parser to infer 
LTZ type with legacy
+            // mode. The mistake makes it easier to infer TIMESTAMP LTZ type 
and we have to keep
+            // this behavior now. See SPARK-46769 for more details.
+            if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, 
false).isDefined) {
+              TimestampType
+            } else {
+              StringType
+            }
+          } else {
+            StringType
+          }
         } else {
           StringType
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 3bd45ca0dcdb..78266acfd7de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1105,10 +1105,12 @@ abstract class CSVSuite
 
   test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ 
values") {
     withTempPath { path =>
-      val exp = spark.sql("""
-        select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
-        select timestamp_ntz'2020-12-12 12:12:12' as col0
-        """)
+      val exp = spark.sql(
+        """
+          |select *
+          |from values (timestamp_ntz'2020-12-12 12:12:12'), 
(timestamp_ntz'2020-12-12 12:12:12')
+          |as t(col0)
+          |""".stripMargin)
 
       exp.write.format("csv").option("header", 
"true").save(path.getAbsolutePath)
 
@@ -1126,6 +1128,15 @@ abstract class CSVSuite
 
           if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
             checkAnswer(res, exp)
+          } else if (SQLConf.get.legacyTimeParserPolicy == 
LegacyBehaviorPolicy.LEGACY) {
+            // When legacy parser is enabled, we can't parse the NTZ string to 
LTZ, and eventually
+            // infer string type.
+            val expected = spark.read
+              .format("csv")
+              .option("inferSchema", "false")
+              .option("header", "true")
+              .load(path.getAbsolutePath)
+            checkAnswer(res, expected)
           } else {
             checkAnswer(
               res,
@@ -2862,13 +2873,12 @@ abstract class CSVSuite
 
   test("SPARK-40474: Infer schema for columns with a mix of dates and 
timestamp") {
     withTempPath { path =>
-      Seq(
-        "1765-03-28",
+      val input = Seq(
         "1423-11-12T23:41:00",
+        "1765-03-28",
         "2016-01-28T20:00:00"
-      ).toDF()
-        .repartition(1)
-        .write.text(path.getAbsolutePath)
+      ).toDF().repartition(1)
+      input.write.text(path.getAbsolutePath)
 
       if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
         val options = Map(
@@ -2879,12 +2889,7 @@ abstract class CSVSuite
           .format("csv")
           .options(options)
           .load(path.getAbsolutePath)
-        val expected = Seq(
-          Row(Timestamp.valueOf("1765-03-28 00:00:00.0")),
-          Row(Timestamp.valueOf("1423-11-12 23:41:00.0")),
-          Row(Timestamp.valueOf("2016-01-28 20:00:00.0"))
-        )
-        checkAnswer(df, expected)
+        checkAnswer(df, input)
       } else {
         // When timestampFormat is specified, infer and parse the column as 
strings
         val options1 = Map(
@@ -2895,12 +2900,7 @@ abstract class CSVSuite
           .format("csv")
           .options(options1)
           .load(path.getAbsolutePath)
-        val expected1 = Seq(
-          Row("1765-03-28"),
-          Row("1423-11-12T23:41:00"),
-          Row("2016-01-28T20:00:00")
-        )
-        checkAnswer(df1, expected1)
+        checkAnswer(df1, input)
 
         // When timestampFormat is not specified, infer and parse the column as
         // timestamp type if possible


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to