This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e4e40762ca41 [SPARK-46769][SQL] Refine timestamp related schema 
inference
e4e40762ca41 is described below

commit e4e40762ca41931646b8f201028b1f2298252d96
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Jan 20 20:57:09 2024 +0800

    [SPARK-46769][SQL] Refine timestamp related schema inference
    
    ### What changes were proposed in this pull request?
    
    This is a refinement of https://github.com/apache/spark/pull/43243 . This 
PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and 
only infer LTZ type using LTZ parser. This consistency is important to avoid 
nondeterministic behaviors.
    
    ### Why are the changes needed?
    
    Avoid non-deterministic behaviors. After 
https://github.com/apache/spark/pull/43243 , we can still have inconsistency if 
the LEGACY mode is enabled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes for the legacy parser. Now it's more likely to infer string type 
instead of inferring timestamp type "by luck"
    
    ### How was this patch tested?
    
    existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes https://github.com/apache/spark/pull/44789
    
    Closes #44800 from cloud-fan/infer.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    | 18 +++++-----
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 31 ++++++++++++----
 .../sql/execution/datasources/csv/CSVSuite.scala   | 42 +++++++++++-----------
 3 files changed, 54 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index ec01b56f9eb7..2c27da3cf6e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
     "yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", 
"yyyy")
 
+  private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+
   /**
    * Similar to the JSON schema inference
    *     1. Infer type of each row
@@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   }
 
   private def tryParseTimestampNTZ(field: String): DataType = {
-    // We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
-    // time-zone component and can be parsed with the timestamp formatter.
-    // Otherwise, it is likely to be a timestamp with timezone.
-    val timestampType = SQLConf.get.timestampType
-    if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
-        timestampType == TimestampNTZType) &&
-        timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-      timestampType
+    // For text-based format, it's ambiguous to infer a timestamp string 
without timezone, as it can
+    // be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new 
support of NTZ, here
+    // we only try to infer NTZ if the config is set to use NTZ by default.
+    if (isDefaultNTZ &&
+      timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
+      TimestampNTZType
     } else {
       tryParseTimestamp(field)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index cfc9e5520e53..bc7038fc71d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
 
@@ -57,6 +58,8 @@ class JsonInferSchema(options: JSONOptions) extends 
Serializable with Logging {
 
   private val ignoreCorruptFiles = options.ignoreCorruptFiles
   private val ignoreMissingFiles = options.ignoreMissingFiles
+  private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+  private val legacyMode = SQLConf.get.legacyTimeParserPolicy == 
LegacyBehaviorPolicy.LEGACY
 
   private def handleJsonErrorsByParseMode(parseMode: ParseMode,
       columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
@@ -159,16 +162,30 @@ class JsonInferSchema(options: JSONOptions) extends 
Serializable with Logging {
           val bigDecimal = decimalParser(field)
             DecimalType(bigDecimal.precision, bigDecimal.scale)
         }
-        val timestampType = SQLConf.get.timestampType
         if (options.prefersDecimal && decimalTry.isDefined) {
           decimalTry.get
-        } else if (options.inferTimestamp && 
(SQLConf.get.legacyTimeParserPolicy ==
-          LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
+        } else if (options.inferTimestamp) {
+          // For text-based format, it's ambiguous to infer a timestamp string 
without timezone, as
+          // it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes 
with the new support
+          // of NTZ, here we only try to infer NTZ if the config is set to use 
NTZ by default.
+          if (isDefaultNTZ &&
             timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-          timestampType
-        } else if (options.inferTimestamp &&
-            timestampFormatter.parseOptional(field).isDefined) {
-          TimestampType
+            TimestampNTZType
+          } else if (timestampFormatter.parseOptional(field).isDefined) {
+            TimestampType
+          } else if (legacyMode) {
+            val utf8Value = UTF8String.fromString(field)
+            // There was a mistake that we use TIMESTAMP NTZ parser to infer 
LTZ type with legacy
+            // mode. The mistake makes it easier to infer TIMESTAMP LTZ type 
and we have to keep
+            // this behavior now. See SPARK-46769 for more details.
+            if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, 
false).isDefined) {
+              TimestampType
+            } else {
+              StringType
+            }
+          } else {
+            StringType
+          }
         } else {
           StringType
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 1ded11731862..e1e39ac1590f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1105,10 +1105,12 @@ abstract class CSVSuite
 
   test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ 
values") {
     withTempPath { path =>
-      val exp = spark.sql("""
-        select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
-        select timestamp_ntz'2020-12-12 12:12:12' as col0
-        """)
+      val exp = spark.sql(
+        """
+          |select *
+          |from values (timestamp_ntz'2020-12-12 12:12:12'), 
(timestamp_ntz'2020-12-12 12:12:12')
+          |as t(col0)
+          |""".stripMargin)
 
       exp.write.format("csv").option("header", 
"true").save(path.getAbsolutePath)
 
@@ -1126,6 +1128,15 @@ abstract class CSVSuite
 
           if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
             checkAnswer(res, exp)
+          } else if (SQLConf.get.legacyTimeParserPolicy == 
LegacyBehaviorPolicy.LEGACY) {
+            // When legacy parser is enabled, we can't parse the NTZ string to 
LTZ, and eventually
+            // infer string type.
+            val expected = spark.read
+              .format("csv")
+              .option("inferSchema", "false")
+              .option("header", "true")
+              .load(path.getAbsolutePath)
+            checkAnswer(res, expected)
           } else {
             checkAnswer(
               res,
@@ -2874,13 +2885,12 @@ abstract class CSVSuite
 
   test("SPARK-40474: Infer schema for columns with a mix of dates and 
timestamp") {
     withTempPath { path =>
-      Seq(
-        "1765-03-28",
+      val input = Seq(
         "1423-11-12T23:41:00",
+        "1765-03-28",
         "2016-01-28T20:00:00"
-      ).toDF()
-        .repartition(1)
-        .write.text(path.getAbsolutePath)
+      ).toDF().repartition(1)
+      input.write.text(path.getAbsolutePath)
 
       if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
         val options = Map(
@@ -2891,12 +2901,7 @@ abstract class CSVSuite
           .format("csv")
           .options(options)
           .load(path.getAbsolutePath)
-        val expected = Seq(
-          Row(Timestamp.valueOf("1765-03-28 00:00:00.0")),
-          Row(Timestamp.valueOf("1423-11-12 23:41:00.0")),
-          Row(Timestamp.valueOf("2016-01-28 20:00:00.0"))
-        )
-        checkAnswer(df, expected)
+        checkAnswer(df, input)
       } else {
         // When timestampFormat is specified, infer and parse the column as 
strings
         val options1 = Map(
@@ -2907,12 +2912,7 @@ abstract class CSVSuite
           .format("csv")
           .options(options1)
           .load(path.getAbsolutePath)
-        val expected1 = Seq(
-          Row("1765-03-28"),
-          Row("1423-11-12T23:41:00"),
-          Row("2016-01-28T20:00:00")
-        )
-        checkAnswer(df1, expected1)
+        checkAnswer(df1, input)
 
         // When timestampFormat is not specified, infer and parse the column as
         // timestamp type if possible


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to