This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0e7c35c279c [SPARK-42243][SQL] Use 
`spark.sql.inferTimestampNTZInDataSources.enabled` to infer timestamp type on 
partition columns
0e7c35c279c is described below

commit 0e7c35c279ca0b7d5be51714bdd6b9b5f90dcf08
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Jan 30 22:59:07 2023 -0800

    [SPARK-42243][SQL] Use `spark.sql.inferTimestampNTZInDataSources.enabled` 
to infer timestamp type on partition columns
    
    ### What changes were proposed in this pull request?
    
    Use `spark.sql.inferTimestampNTZInDataSources.enabled` to infer timestamp 
type on partition columns, instead of `spark.sql.timestampType`.
    
    ### Why are the changes needed?
    
    Similar to https://github.com/apache/spark/pull/39777:
    * make the schema inference in data sources consistent
    * use a light-weight configuration for data source schema inference.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, TimestampNTZ is not released yet.
    
    ### How was this patch tested?
    
    UT
    
    Closes #39812 from gengliangwang/partitionNTZ.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit b509ad15714538550ecfd6eeebd968aff29ec364)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    |  6 +--
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  6 +--
 .../org/apache/spark/sql/internal/SQLConf.scala    | 15 +++++++-
 .../execution/datasources/PartitioningUtils.scala  |  6 +--
 .../parquet/ParquetPartitionDiscoverySuite.scala   | 45 ++++++++++++----------
 5 files changed, 43 insertions(+), 35 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 57e683abc13..826e8584db4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -203,11 +203,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
     // time-zone component and can be parsed with the timestamp formatter.
     // Otherwise, it is likely to be a timestamp with timezone.
     if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-      if (SQLConf.get.inferTimestampNTZInDataSources) {
-        TimestampNTZType
-      } else {
-        TimestampType
-      }
+      SQLConf.get.timestampTypeInSchemaInference
     } else {
       tryParseTimestamp(field)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index f5721d7aa8e..b1429e6b215 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -152,11 +152,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
           decimalTry.get
         } else if (options.inferTimestamp &&
             timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-          if (SQLConf.get.inferTimestampNTZInDataSources) {
-            TimestampNTZType
-          } else {
-            TimestampType
-          }
+          SQLConf.get.timestampTypeInSchemaInference
         } else if (options.inferTimestamp &&
             timestampFormatter.parseOptional(field).isDefined) {
           TimestampType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 925769a91bb..4f2d5f6c106 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3511,8 +3511,9 @@ object SQLConf {
     buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
       .doc("When true, the TimestampNTZ type is the prior choice of the schema 
inference " +
         "over built-in data sources. Otherwise, the inference result will be 
TimestampLTZ for " +
-        "backward compatibility. As a result, for JSON/CSV files written with 
TimestampNTZ " +
-        "columns, the inference results will still be of TimestampLTZ types.")
+        "backward compatibility. As a result, for JSON/CSV files and partition 
directories  " +
+        "written with TimestampNTZ columns, the inference results will still 
be of TimestampLTZ " +
+        "types.")
       .version("3.4.0")
       .booleanConf
       .createWithDefault(false)
@@ -4807,6 +4808,16 @@ class SQLConf extends Serializable with Logging {
 
   def inferTimestampNTZInDataSources: Boolean = 
getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)
 
+  // Preferred timestamp type in schema reference when a column can be either 
Timestamp type or
+  // TimestampNTZ type.
+  def timestampTypeInSchemaInference: AtomicType = {
+    if (getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)) {
+      TimestampNTZType
+    } else {
+      TimestampType
+    }
+  }
+
   def nestedSchemaPruningEnabled: Boolean = 
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
 
   def serializerNestedSchemaPruningEnabled: Boolean =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 4f43c130525..38c3f71ab49 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -488,10 +488,10 @@ object PartitioningUtils extends SQLConfHelper {
 
     val timestampTry = Try {
       val unescapedRaw = unescapePathName(raw)
-      // the inferred data type is consistent with the default timestamp type
-      val timestampType = conf.timestampType
       // try and parse the date, if no exception occurs this is a candidate to 
be resolved as
-      // TimestampType or TimestampNTZType
+      // TimestampType or TimestampNTZType. The inference timestamp typ is 
controlled by the conf
+      // "spark.sql.inferTimestampNTZInDataSources.enabled".
+      val timestampType = conf.timestampTypeInSchemaInference
       timestampType match {
         case TimestampType => timestampFormatter.parse(unescapedRaw)
         case TimestampNTZType => 
timestampFormatter.parseWithoutTimeZone(unescapedRaw)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 6151e1d7cb1..d91320bee7e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -38,7 +38,6 @@ import 
org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
FileTable}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.TimestampTypes
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -83,11 +82,16 @@ abstract class ParquetPartitionDiscoverySuite
     check("1.5", DoubleType)
     check("hello", StringType)
     check("1990-02-24", DateType)
-    // The inferred timestmap type is consistent with the value of 
`SQLConf.TIMESTAMP_TYPE`
-    Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { 
tsType =>
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
-        check("1990-02-24 12:00:30", SQLConf.get.timestampType)
-        check("1990-02-24 12:00:30", SQLConf.get.timestampType, ZoneOffset.UTC)
+    // The inferred timestamp type is controlled by 
`SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
+    Seq(false, true).foreach { inferTimestampNTZ =>
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> 
inferTimestampNTZ.toString) {
+        val timestampType = if (inferTimestampNTZ) {
+          TimestampNTZType
+        } else {
+          TimestampType
+        }
+        check("1990-02-24 12:00:30", timestampType)
+        check("1990-02-24 12:00:30", timestampType, ZoneOffset.UTC)
       }
     }
 
@@ -368,16 +372,16 @@ abstract class ParquetPartitionDiscoverySuite
       s"hdfs://host:9000/path2"),
       PartitionSpec.emptySpec)
 
-    // The inferred timestmap type is consistent with the value of 
`SQLConf.TIMESTAMP_TYPE`
-    Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { 
tsType =>
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
+    // The inferred timestamp type is controlled by 
`SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
+    Seq(false, true).foreach { inferTimestampNTZ =>
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> 
inferTimestampNTZ.toString) {
         // The cases below check the resolution for type conflicts.
-        val t1 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+        val t1 = if (!inferTimestampNTZ) {
           Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
         } else {
           localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:00:00"))
         }
-        val t2 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+        val t2 = if (!inferTimestampNTZ) {
           Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
         } else {
           localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:01:00"))
@@ -392,7 +396,7 @@ abstract class ParquetPartitionDiscoverySuite
           s"hdfs://host:9000/path/a=2014-01-01 
00%3A01%3A00.0/b=$defaultPartitionName"),
           PartitionSpec(
             StructType(Seq(
-              StructField("a", SQLConf.get.timestampType),
+              StructField("a", SQLConf.get.timestampTypeInSchemaInference),
               StructField("b", DecimalType(22, 0)))),
             Seq(
               Partition(
@@ -657,9 +661,10 @@ abstract class ParquetPartitionDiscoverySuite
   }
 
   test("Various partition value types") {
-    Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { 
tsType =>
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
-        val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+    // The inferred timestamp type is controlled by 
`SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
+    Seq(false, true).foreach { inferTimestampNTZ =>
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> 
inferTimestampNTZ.toString) {
+        val ts = if (!inferTimestampNTZ) {
           new Timestamp(0)
         } else {
           LocalDateTime.parse("1970-01-01T00:00:00")
@@ -691,7 +696,7 @@ abstract class ParquetPartitionDiscoverySuite
             DecimalType(10, 5),
             DecimalType.SYSTEM_DEFAULT,
             DateType,
-            SQLConf.get.timestampType,
+            SQLConf.get.timestampTypeInSchemaInference,
             StringType)
 
         val partitionColumns = partitionColumnTypes.zipWithIndex.map {
@@ -722,9 +727,9 @@ abstract class ParquetPartitionDiscoverySuite
   }
 
   test("Various inferred partition value types") {
-    Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { 
tsType =>
-      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
-        val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+    Seq(false, true).foreach { inferTimestampNTZ =>
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> 
inferTimestampNTZ.toString) {
+        val ts = if (!inferTimestampNTZ) {
           Timestamp.valueOf("1990-02-24 12:00:30")
         } else {
           LocalDateTime.parse("1990-02-24T12:00:30")
@@ -745,7 +750,7 @@ abstract class ParquetPartitionDiscoverySuite
             DoubleType,
             DecimalType(20, 0),
             DateType,
-            SQLConf.get.timestampType,
+            SQLConf.get.timestampTypeInSchemaInference,
             StringType)
 
         val partitionColumns = partitionColumnTypes.zipWithIndex.map {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to