This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new c19bf01b5208 [SPARK-46769][SQL] Refine timestamp related schema
inference
c19bf01b5208 is described below
commit c19bf01b5208bb3aad0e6264b64597e0809b1efe
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Jan 20 20:57:09 2024 +0800
[SPARK-46769][SQL] Refine timestamp related schema inference
This is a refinement of https://github.com/apache/spark/pull/43243 . This
PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and
only infer LTZ type using LTZ parser. This consistency is important to avoid
nondeterministic behaviors.
Avoid non-deterministic behaviors. After
https://github.com/apache/spark/pull/43243 , we can still have inconsistency if
the LEGACY mode is enabled.
Yes for the legacy parser. Now it's more likely to infer string type
instead of inferring timestamp type "by luck"
existing tests
no
Closes https://github.com/apache/spark/pull/44789
Closes #44800 from cloud-fan/infer.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e4e40762ca41931646b8f201028b1f2298252d96)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/csv/CSVInferSchema.scala | 18 +++++-----
.../spark/sql/catalyst/json/JsonInferSchema.scala | 32 +++++++++++++----
.../sql/execution/datasources/csv/CSVSuite.scala | 42 +++++++++++-----------
3 files changed, 55 insertions(+), 37 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index ec01b56f9eb7..2c27da3cf6e1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
"yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M",
"yyyy")
+ private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+
/**
* Similar to the JSON schema inference
* 1. Infer type of each row
@@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
}
private def tryParseTimestampNTZ(field: String): DataType = {
- // We can only parse the value as TimestampNTZType if it does not have
zone-offset or
- // time-zone component and can be parsed with the timestamp formatter.
- // Otherwise, it is likely to be a timestamp with timezone.
- val timestampType = SQLConf.get.timestampType
- if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
- timestampType == TimestampNTZType) &&
- timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- timestampType
+ // For text-based format, it's ambiguous to infer a timestamp string
without timezone, as it can
+ // be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new
support of NTZ, here
+ // we only try to infer NTZ if the config is set to use NTZ by default.
+ if (isDefaultNTZ &&
+ timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
+ TimestampNTZType
} else {
tryParseTimestamp(field)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 4123c5290b6a..f6d32f39f64e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -34,6 +34,7 @@ import
org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
@@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
isParsing = true,
forTimestampNTZ = true)
+ private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+ private val legacyMode = SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY
+
private def handleJsonErrorsByParseMode(parseMode: ParseMode,
columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
parseMode match {
@@ -148,16 +152,30 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
- val timestampType = SQLConf.get.timestampType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
- } else if (options.inferTimestamp &&
(SQLConf.get.legacyTimeParserPolicy ==
- LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
+ } else if (options.inferTimestamp) {
+ // For text-based format, it's ambiguous to infer a timestamp string
without timezone, as
+ // it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes
with the new support
+ // of NTZ, here we only try to infer NTZ if the config is set to use
NTZ by default.
+ if (isDefaultNTZ &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- timestampType
- } else if (options.inferTimestamp &&
- timestampFormatter.parseOptional(field).isDefined) {
- TimestampType
+ TimestampNTZType
+ } else if (timestampFormatter.parseOptional(field).isDefined) {
+ TimestampType
+ } else if (legacyMode) {
+ val utf8Value = UTF8String.fromString(field)
+ // There was a mistake that we use TIMESTAMP NTZ parser to infer
LTZ type with legacy
+ // mode. The mistake makes it easier to infer TIMESTAMP LTZ type
and we have to keep
+ // this behavior now. See SPARK-46769 for more details.
+ if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value,
false).isDefined) {
+ TimestampType
+ } else {
+ StringType
+ }
+ } else {
+ StringType
+ }
} else {
StringType
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 3bd45ca0dcdb..78266acfd7de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1105,10 +1105,12 @@ abstract class CSVSuite
test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ
values") {
withTempPath { path =>
- val exp = spark.sql("""
- select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
- select timestamp_ntz'2020-12-12 12:12:12' as col0
- """)
+ val exp = spark.sql(
+ """
+ |select *
+ |from values (timestamp_ntz'2020-12-12 12:12:12'),
(timestamp_ntz'2020-12-12 12:12:12')
+ |as t(col0)
+ |""".stripMargin)
exp.write.format("csv").option("header",
"true").save(path.getAbsolutePath)
@@ -1126,6 +1128,15 @@ abstract class CSVSuite
if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
+ } else if (SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY) {
+ // When legacy parser is enabled, we can't parse the NTZ string to
LTZ, and eventually
+ // infer string type.
+ val expected = spark.read
+ .format("csv")
+ .option("inferSchema", "false")
+ .option("header", "true")
+ .load(path.getAbsolutePath)
+ checkAnswer(res, expected)
} else {
checkAnswer(
res,
@@ -2862,13 +2873,12 @@ abstract class CSVSuite
test("SPARK-40474: Infer schema for columns with a mix of dates and
timestamp") {
withTempPath { path =>
- Seq(
- "1765-03-28",
+ val input = Seq(
"1423-11-12T23:41:00",
+ "1765-03-28",
"2016-01-28T20:00:00"
- ).toDF()
- .repartition(1)
- .write.text(path.getAbsolutePath)
+ ).toDF().repartition(1)
+ input.write.text(path.getAbsolutePath)
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
val options = Map(
@@ -2879,12 +2889,7 @@ abstract class CSVSuite
.format("csv")
.options(options)
.load(path.getAbsolutePath)
- val expected = Seq(
- Row(Timestamp.valueOf("1765-03-28 00:00:00.0")),
- Row(Timestamp.valueOf("1423-11-12 23:41:00.0")),
- Row(Timestamp.valueOf("2016-01-28 20:00:00.0"))
- )
- checkAnswer(df, expected)
+ checkAnswer(df, input)
} else {
// When timestampFormat is specified, infer and parse the column as
strings
val options1 = Map(
@@ -2895,12 +2900,7 @@ abstract class CSVSuite
.format("csv")
.options(options1)
.load(path.getAbsolutePath)
- val expected1 = Seq(
- Row("1765-03-28"),
- Row("1423-11-12T23:41:00"),
- Row("2016-01-28T20:00:00")
- )
- checkAnswer(df1, expected1)
+ checkAnswer(df1, input)
// When timestampFormat is not specified, infer and parse the column as
// timestamp type if possible
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]