This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 7e3ddc1e582 [SPARK-45433][SQL] Fix CSV/JSON schema inference when
timestamps do not match specified timestampFormat
7e3ddc1e582 is described below
commit 7e3ddc1e582a6e4fa96bab608c4c2bbc2c93b449
Author: Jia Fan <[email protected]>
AuthorDate: Wed Oct 11 19:33:23 2023 +0300
[SPARK-45433][SQL] Fix CSV/JSON schema inference when timestamps do not
match specified timestampFormat
### What changes were proposed in this pull request?
This PR fix CSV/JSON schema inference when timestamps do not match
specified timestampFormat will report error.
```scala
//eg
val csv = spark.read.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
csv.show()
//error
Caused by: java.time.format.DateTimeParseException: Text
'2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 19
```
This bug only happend when partition had one row. The data type should be
`StringType` not `TimestampType` because the value not match `timestampFormat`.
Use csv as eg, in `CSVInferSchema::tryParseTimestampNTZ`, first, use
`timestampNTZFormatter.parseWithoutTimeZoneOptional` to inferring return
`TimestampType`, if same partition had another row, it will use
`tryParseTimestamp` to parse row with user defined `timestampFormat`, then
found it can't be convert to timestamp with `timestampFormat`. Finally return
`StringType`. But when only one row, we use
`timestampNTZFormatter.parseWithoutTimeZoneOptional` to parse normally
timestamp not r [...]
### Why are the changes needed?
Fix schema inference bug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
add new test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43243 from
Hisoka-X/SPARK-45433-inference-mismatch-timestamp-one-row.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit eae5c0e1efce83c2bb08754784db070be285285a)
Signed-off-by: Max Gekk <[email protected]>
---
.../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 9 ++++++---
.../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala | 8 +++++---
.../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 10 ++++++++++
.../apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala | 8 ++++++++
4 files changed, 29 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 51586a0065e..ec01b56f9eb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -202,8 +202,11 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
// We can only parse the value as TimestampNTZType if it does not have
zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
- if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- SQLConf.get.timestampType
+ val timestampType = SQLConf.get.timestampType
+ if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
+ timestampType == TimestampNTZType) &&
+ timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
+ timestampType
} else {
tryParseTimestamp(field)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 5385afe8c93..4123c5290b6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -32,7 +32,7 @@ import
org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -148,11 +148,13 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
+ val timestampType = SQLConf.get.timestampType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
- } else if (options.inferTimestamp &&
+ } else if (options.inferTimestamp &&
(SQLConf.get.legacyTimeParserPolicy ==
+ LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field,
false).isDefined) {
- SQLConf.get.timestampType
+ timestampType
} else if (options.inferTimestamp &&
timestampFormatter.parseOptional(field).isDefined) {
TimestampType
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index acedf7998c2..fb91200557a 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -263,4 +263,14 @@ class CSVInferSchemaSuite extends SparkFunSuite with
SQLHelper {
inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
}
+
+ test("SPARK-45433: inferring the schema when timestamps do not match
specified timestampFormat" +
+ " with only one row") {
+ val options = new CSVOptions(
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
+ columnPruning = false,
+ defaultTimeZoneId = "UTC")
+ val inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "2884-06-24T02:45:51.138") ==
StringType)
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
index 8290b38e339..81a4858dce8 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
@@ -112,4 +112,12 @@ class JsonInferSchemaSuite extends SparkFunSuite with
SQLHelper {
checkType(Map("inferTimestamp" -> "true"), json, TimestampType)
checkType(Map("inferTimestamp" -> "false"), json, StringType)
}
+
+ test("SPARK-45433: inferring the schema when timestamps do not match
specified timestampFormat" +
+ " with only one row") {
+ checkType(
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "inferTimestamp" ->
"true"),
+ """{"a": "2884-06-24T02:45:51.138"}""",
+ StringType)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]