This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0e7c35c279c [SPARK-42243][SQL] Use
`spark.sql.inferTimestampNTZInDataSources.enabled` to infer timestamp type on
partition columns
0e7c35c279c is described below
commit 0e7c35c279ca0b7d5be51714bdd6b9b5f90dcf08
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Jan 30 22:59:07 2023 -0800
[SPARK-42243][SQL] Use `spark.sql.inferTimestampNTZInDataSources.enabled`
to infer timestamp type on partition columns
### What changes were proposed in this pull request?
Use `spark.sql.inferTimestampNTZInDataSources.enabled` to infer timestamp
type on partition columns, instead of `spark.sql.timestampType`.
### Why are the changes needed?
Similar to https://github.com/apache/spark/pull/39777:
* make the schema inference in data sources consistent
* use a light-weight configuration for data source schema inference.
### Does this PR introduce _any_ user-facing change?
No, TimestampNTZ is not released yet.
### How was this patch tested?
UT
Closes #39812 from gengliangwang/partitionNTZ.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit b509ad15714538550ecfd6eeebd968aff29ec364)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/catalyst/csv/CSVInferSchema.scala | 6 +--
.../spark/sql/catalyst/json/JsonInferSchema.scala | 6 +--
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +++++++-
.../execution/datasources/PartitioningUtils.scala | 6 +--
.../parquet/ParquetPartitionDiscoverySuite.scala | 45 ++++++++++++----------
5 files changed, 43 insertions(+), 35 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 57e683abc13..826e8584db4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -203,11 +203,7 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- if (SQLConf.get.inferTimestampNTZInDataSources) {
- TimestampNTZType
- } else {
- TimestampType
- }
+ SQLConf.get.timestampTypeInSchemaInference
} else {
tryParseTimestamp(field)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index f5721d7aa8e..b1429e6b215 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -152,11 +152,7 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
decimalTry.get
} else if (options.inferTimestamp &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- if (SQLConf.get.inferTimestampNTZInDataSources) {
- TimestampNTZType
- } else {
- TimestampType
- }
+ SQLConf.get.timestampTypeInSchemaInference
} else if (options.inferTimestamp &&
timestampFormatter.parseOptional(field).isDefined) {
TimestampType
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 925769a91bb..4f2d5f6c106 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3511,8 +3511,9 @@ object SQLConf {
buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
.doc("When true, the TimestampNTZ type is the prior choice of the schema
inference " +
"over built-in data sources. Otherwise, the inference result will be
TimestampLTZ for " +
- "backward compatibility. As a result, for JSON/CSV files written with
TimestampNTZ " +
- "columns, the inference results will still be of TimestampLTZ types.")
+ "backward compatibility. As a result, for JSON/CSV files and partition
directories " +
+ "written with TimestampNTZ columns, the inference results will still
be of TimestampLTZ " +
+ "types.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
@@ -4807,6 +4808,16 @@ class SQLConf extends Serializable with Logging {
def inferTimestampNTZInDataSources: Boolean =
getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)
+ // Preferred timestamp type in schema reference when a column can be either
Timestamp type or
+ // TimestampNTZ type.
+ def timestampTypeInSchemaInference: AtomicType = {
+ if (getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)) {
+ TimestampNTZType
+ } else {
+ TimestampType
+ }
+ }
+
def nestedSchemaPruningEnabled: Boolean =
getConf(NESTED_SCHEMA_PRUNING_ENABLED)
def serializerNestedSchemaPruningEnabled: Boolean =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 4f43c130525..38c3f71ab49 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -488,10 +488,10 @@ object PartitioningUtils extends SQLConfHelper {
val timestampTry = Try {
val unescapedRaw = unescapePathName(raw)
- // the inferred data type is consistent with the default timestamp type
- val timestampType = conf.timestampType
// try and parse the date, if no exception occurs this is a candidate to
be resolved as
- // TimestampType or TimestampNTZType
+ // TimestampType or TimestampNTZType. The inference timestamp typ is
controlled by the conf
+ // "spark.sql.inferTimestampNTZInDataSources.enabled".
+ val timestampType = conf.timestampTypeInSchemaInference
timestampType match {
case TimestampType => timestampFormatter.parse(unescapedRaw)
case TimestampNTZType =>
timestampFormatter.parseWithoutTimeZone(unescapedRaw)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 6151e1d7cb1..d91320bee7e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -38,7 +38,6 @@ import
org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
FileTable}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.TimestampTypes
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -83,11 +82,16 @@ abstract class ParquetPartitionDiscoverySuite
check("1.5", DoubleType)
check("hello", StringType)
check("1990-02-24", DateType)
- // The inferred timestmap type is consistent with the value of
`SQLConf.TIMESTAMP_TYPE`
- Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach {
tsType =>
- withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
- check("1990-02-24 12:00:30", SQLConf.get.timestampType)
- check("1990-02-24 12:00:30", SQLConf.get.timestampType, ZoneOffset.UTC)
+ // The inferred timestamp type is controlled by
`SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
+ Seq(false, true).foreach { inferTimestampNTZ =>
+ withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key ->
inferTimestampNTZ.toString) {
+ val timestampType = if (inferTimestampNTZ) {
+ TimestampNTZType
+ } else {
+ TimestampType
+ }
+ check("1990-02-24 12:00:30", timestampType)
+ check("1990-02-24 12:00:30", timestampType, ZoneOffset.UTC)
}
}
@@ -368,16 +372,16 @@ abstract class ParquetPartitionDiscoverySuite
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)
- // The inferred timestmap type is consistent with the value of
`SQLConf.TIMESTAMP_TYPE`
- Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach {
tsType =>
- withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
+ // The inferred timestamp type is controlled by
`SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
+ Seq(false, true).foreach { inferTimestampNTZ =>
+ withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key ->
inferTimestampNTZ.toString) {
// The cases below check the resolution for type conflicts.
- val t1 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+ val t1 = if (!inferTimestampNTZ) {
Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
} else {
localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:00:00"))
}
- val t2 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+ val t2 = if (!inferTimestampNTZ) {
Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
} else {
localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:01:00"))
@@ -392,7 +396,7 @@ abstract class ParquetPartitionDiscoverySuite
s"hdfs://host:9000/path/a=2014-01-01
00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
- StructField("a", SQLConf.get.timestampType),
+ StructField("a", SQLConf.get.timestampTypeInSchemaInference),
StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
@@ -657,9 +661,10 @@ abstract class ParquetPartitionDiscoverySuite
}
test("Various partition value types") {
- Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach {
tsType =>
- withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
- val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+ // The inferred timestamp type is controlled by
`SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
+ Seq(false, true).foreach { inferTimestampNTZ =>
+ withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key ->
inferTimestampNTZ.toString) {
+ val ts = if (!inferTimestampNTZ) {
new Timestamp(0)
} else {
LocalDateTime.parse("1970-01-01T00:00:00")
@@ -691,7 +696,7 @@ abstract class ParquetPartitionDiscoverySuite
DecimalType(10, 5),
DecimalType.SYSTEM_DEFAULT,
DateType,
- SQLConf.get.timestampType,
+ SQLConf.get.timestampTypeInSchemaInference,
StringType)
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
@@ -722,9 +727,9 @@ abstract class ParquetPartitionDiscoverySuite
}
test("Various inferred partition value types") {
- Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach {
tsType =>
- withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
- val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
+ Seq(false, true).foreach { inferTimestampNTZ =>
+ withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key ->
inferTimestampNTZ.toString) {
+ val ts = if (!inferTimestampNTZ) {
Timestamp.valueOf("1990-02-24 12:00:30")
} else {
LocalDateTime.parse("1990-02-24T12:00:30")
@@ -745,7 +750,7 @@ abstract class ParquetPartitionDiscoverySuite
DoubleType,
DecimalType(20, 0),
DateType,
- SQLConf.get.timestampType,
+ SQLConf.get.timestampTypeInSchemaInference,
StringType)
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]