This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e4e40762ca41 [SPARK-46769][SQL] Refine timestamp related schema
inference
e4e40762ca41 is described below
commit e4e40762ca41931646b8f201028b1f2298252d96
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Jan 20 20:57:09 2024 +0800
[SPARK-46769][SQL] Refine timestamp related schema inference
### What changes were proposed in this pull request?
This is a refinement of https://github.com/apache/spark/pull/43243 . This
PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and
only infer LTZ type using LTZ parser. This consistency is important to avoid
nondeterministic behaviors.
### Why are the changes needed?
Avoid non-deterministic behaviors. After
https://github.com/apache/spark/pull/43243 , we can still have inconsistency if
the LEGACY mode is enabled.
### Does this PR introduce _any_ user-facing change?
Yes for the legacy parser. Now it's more likely to infer string type
instead of inferring timestamp type "by luck"
### How was this patch tested?
existing tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes https://github.com/apache/spark/pull/44789
Closes #44800 from cloud-fan/infer.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/csv/CSVInferSchema.scala | 18 +++++-----
.../spark/sql/catalyst/json/JsonInferSchema.scala | 31 ++++++++++++----
.../sql/execution/datasources/csv/CSVSuite.scala | 42 +++++++++++-----------
3 files changed, 54 insertions(+), 37 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index ec01b56f9eb7..2c27da3cf6e1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
"yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M",
"yyyy")
+ private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+
/**
* Similar to the JSON schema inference
* 1. Infer type of each row
@@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
}
private def tryParseTimestampNTZ(field: String): DataType = {
- // We can only parse the value as TimestampNTZType if it does not have
zone-offset or
- // time-zone component and can be parsed with the timestamp formatter.
- // Otherwise, it is likely to be a timestamp with timezone.
- val timestampType = SQLConf.get.timestampType
- if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
- timestampType == TimestampNTZType) &&
- timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- timestampType
+ // For text-based format, it's ambiguous to infer a timestamp string
without timezone, as it can
+ // be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new
support of NTZ, here
+ // we only try to infer NTZ if the config is set to use NTZ by default.
+ if (isDefaultNTZ &&
+ timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
+ TimestampNTZType
} else {
tryParseTimestamp(field)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index cfc9e5520e53..bc7038fc71d4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
@@ -57,6 +58,8 @@ class JsonInferSchema(options: JSONOptions) extends
Serializable with Logging {
private val ignoreCorruptFiles = options.ignoreCorruptFiles
private val ignoreMissingFiles = options.ignoreMissingFiles
+ private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+ private val legacyMode = SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY
private def handleJsonErrorsByParseMode(parseMode: ParseMode,
columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
@@ -159,16 +162,30 @@ class JsonInferSchema(options: JSONOptions) extends
Serializable with Logging {
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
- val timestampType = SQLConf.get.timestampType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
- } else if (options.inferTimestamp &&
(SQLConf.get.legacyTimeParserPolicy ==
- LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
+ } else if (options.inferTimestamp) {
+ // For text-based format, it's ambiguous to infer a timestamp string
without timezone, as
+ // it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes
with the new support
+ // of NTZ, here we only try to infer NTZ if the config is set to use
NTZ by default.
+ if (isDefaultNTZ &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- timestampType
- } else if (options.inferTimestamp &&
- timestampFormatter.parseOptional(field).isDefined) {
- TimestampType
+ TimestampNTZType
+ } else if (timestampFormatter.parseOptional(field).isDefined) {
+ TimestampType
+ } else if (legacyMode) {
+ val utf8Value = UTF8String.fromString(field)
+ // There was a mistake that we use TIMESTAMP NTZ parser to infer
LTZ type with legacy
+ // mode. The mistake makes it easier to infer TIMESTAMP LTZ type
and we have to keep
+ // this behavior now. See SPARK-46769 for more details.
+ if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value,
false).isDefined) {
+ TimestampType
+ } else {
+ StringType
+ }
+ } else {
+ StringType
+ }
} else {
StringType
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 1ded11731862..e1e39ac1590f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1105,10 +1105,12 @@ abstract class CSVSuite
test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ
values") {
withTempPath { path =>
- val exp = spark.sql("""
- select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
- select timestamp_ntz'2020-12-12 12:12:12' as col0
- """)
+ val exp = spark.sql(
+ """
+ |select *
+ |from values (timestamp_ntz'2020-12-12 12:12:12'),
(timestamp_ntz'2020-12-12 12:12:12')
+ |as t(col0)
+ |""".stripMargin)
exp.write.format("csv").option("header",
"true").save(path.getAbsolutePath)
@@ -1126,6 +1128,15 @@ abstract class CSVSuite
if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
+ } else if (SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY) {
+ // When legacy parser is enabled, we can't parse the NTZ string to
LTZ, and eventually
+ // infer string type.
+ val expected = spark.read
+ .format("csv")
+ .option("inferSchema", "false")
+ .option("header", "true")
+ .load(path.getAbsolutePath)
+ checkAnswer(res, expected)
} else {
checkAnswer(
res,
@@ -2874,13 +2885,12 @@ abstract class CSVSuite
test("SPARK-40474: Infer schema for columns with a mix of dates and
timestamp") {
withTempPath { path =>
- Seq(
- "1765-03-28",
+ val input = Seq(
"1423-11-12T23:41:00",
+ "1765-03-28",
"2016-01-28T20:00:00"
- ).toDF()
- .repartition(1)
- .write.text(path.getAbsolutePath)
+ ).toDF().repartition(1)
+ input.write.text(path.getAbsolutePath)
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
val options = Map(
@@ -2891,12 +2901,7 @@ abstract class CSVSuite
.format("csv")
.options(options)
.load(path.getAbsolutePath)
- val expected = Seq(
- Row(Timestamp.valueOf("1765-03-28 00:00:00.0")),
- Row(Timestamp.valueOf("1423-11-12 23:41:00.0")),
- Row(Timestamp.valueOf("2016-01-28 20:00:00.0"))
- )
- checkAnswer(df, expected)
+ checkAnswer(df, input)
} else {
// When timestampFormat is specified, infer and parse the column as
strings
val options1 = Map(
@@ -2907,12 +2912,7 @@ abstract class CSVSuite
.format("csv")
.options(options1)
.load(path.getAbsolutePath)
- val expected1 = Seq(
- Row("1765-03-28"),
- Row("1423-11-12T23:41:00"),
- Row("2016-01-28T20:00:00")
- )
- checkAnswer(df1, expected1)
+ checkAnswer(df1, input)
// When timestampFormat is not specified, infer and parse the column as
// timestamp type if possible
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]