This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c2536a7eabd [SPARK-39469][SQL] Infer date type for CSV schema inference
c2536a7eabd is described below
commit c2536a7eabd8764cbbaaff22935e19685b92f22b
Author: Jonathan Cui <[email protected]>
AuthorDate: Thu Jul 21 17:04:44 2022 +0800
[SPARK-39469][SQL] Infer date type for CSV schema inference
### What changes were proposed in this pull request?
1. Add a new `inferDate` option to CSV Options. The description is:
> Whether or not to infer columns that satisfy the `dateFormat` option as
`Date`. Requires `inferSchema` to be true. When `false`, columns with dates
will be inferred as `String` (or as `Timestamp` if it fits the
`timestampFormat`) Legacy date formats in `Timestamp` columns cannot be parsed
with this option.
An error will be thrown if `inferDate` is true when SQL Configuration
LegacyTimeParserPolicy is `LEGACY`. This is to avoid incorrect schema
inferences from legacy time parsers not doing strict parsing.
The `inferDate` option should prevent performance degradation for users who
don't opt-in.
2. Modify InferField in CSVInferSchema.scala to include Date type.
If `typeSoFar` in `inferField` is Date, Timestamp or TimstampNTZ, we will
first attempt to parse Date and then parse Timestamp/TimestampNTZ. The reason
why we attempt to parse date for `typeSoFar`=Timestamp/TimestampNTZ is because
of the case where a column contains a timestamp entry and then a date entry -
we should detect both of the data types and infer the column as a timestamp
type.
Example:
```
Seq("2010|10|10", "2010_10_10")
.toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
spark.read
.option("inferSchema", "true")
.option("header", "false")
.option("dateFormat", "yyyy|MM|dd")
.option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema()
```
Result:
```
root
|-- _c0: timestamp (nullable = true)
```
3. Also modified `makeConverter` in `UnivocityParser` to handle Date type
entries in a timestamp type column to properly parse the above example.
### Does this PR introduce _any_ user-facing change?
The new behavior of schema inference when `inferDate = true`:
1. If a column contains only dates, it should be of “date” type in the
inferred schema
--> If the date format and the timestamp format are identical (e.g. both
are yyyy/mm/dd), entries will default to being interpreted as Date
3. If a column contains dates and timestamps, it should be of “timestamp”
type in the inferred schema
### How was this patch tested?
Unit tests were added to `CSVInferSchemaSuite` and `UnivocityParserSuite`.
An end to end test is added to `CSVSuite`
### Benchmarks:
`inferDate` increases parsing/inference time in general. The impact scales
with the number of rows (and not the number of columns). For columns of date
type (which would be inferred as timestamp when `inferDate=false`), inference
and parsing takes 30% longer. The performance impact is much greater on columns
of timestamp type (taking 30x longer than `inferDate=false`) - due to trying
each timestamp as a date (and throwing an error) during the inference step.
#### Number of seconds taken to parse each CSV file with `inferDate true`
and `inferDate false`
| | inferDate=False |
inferDate=True | master branch |
|---------------------------------------------|-----------------|----------------|---------------|
| Small file (<100 row/col). Mixed data types | 0.32 | 0.33
| |
| 100K rows. 4 columns. Mixed data types. | 0.70 | 2.80
| 0.70 |
| 20k columns. 4 rows. Mixed Data types. | 16.32 | 15.90
| 13.5 |
| Large file. Only date type. | 2.15 | 3.70
| 2.10 |
| Large file. Only timestamp type. | 2.60 | 77.00
| 2.30 |
Results are the average of 3 trials with the same machine.
Over multiple runs, master branch benchmark times have also shown results
that are slower than `inferDate=false` (although the average is slightly
faster). Given the +/- 20% variance in results between trials, master branch
benchmark results are roughly similar to `inferDate=False` results.
Closes #36871 from Jonathancui123/SPARK-39469-date-infer.
Lead-authored-by: Jonathan Cui <[email protected]>
Co-authored-by: Jonathan Cui <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
core/src/main/resources/error/error-classes.json | 6 +++
docs/sql-data-sources-csv.md | 6 +++
.../spark/sql/catalyst/csv/CSVInferSchema.scala | 21 ++++++++-
.../apache/spark/sql/catalyst/csv/CSVOptions.scala | 24 +++++++++-
.../spark/sql/catalyst/csv/UnivocityParser.scala | 35 +++++++++-----
.../spark/sql/errors/QueryExecutionErrors.scala | 8 +++-
.../sql/catalyst/csv/CSVInferSchemaSuite.scala | 55 ++++++++++++++++++++++
.../sql/catalyst/csv/UnivocityParserSuite.scala | 23 +++++++++
.../test/resources/test-data/date-infer-schema.csv | 4 ++
.../sql/execution/datasources/csv/CSVSuite.scala | 52 ++++++++++++++++++++
10 files changed, 219 insertions(+), 15 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index d49239c29f5..e2a99c1a62e 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -23,6 +23,12 @@
],
"sqlState" : "22005"
},
+ "CANNOT_INFER_DATE" : {
+ "message" : [
+ "Cannot infer date in schema inference when LegacyTimeParserPolicy is
\"LEGACY\". Legacy Date formatter does not support strict date format matching
which is required to avoid inferring timestamps and other non-date entries to
date."
+ ],
+ "sqlState" : "22007"
+ },
"CANNOT_PARSE_DECIMAL" : {
"message" : [
"Cannot parse decimal"
diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md
index 1be1d7446e8..8384f8332a6 100644
--- a/docs/sql-data-sources-csv.md
+++ b/docs/sql-data-sources-csv.md
@@ -108,6 +108,12 @@ Data source options of CSV can be set via:
<td>Infers the input schema automatically from data. It requires one extra
pass over the data. CSV built-in functions ignore this option.</td>
<td>read</td>
</tr>
+ <tr>
+ <td><code>inferDate</code></td>
+ <td>false</td>
+ <td>Whether or not to infer columns that satisfy the
<code>dateFormat</code> option as <code>Date</code>. Requires
<code>inferSchema</code> to be <code>true</code>. When <code>false</code>,
columns with dates will be inferred as <code>String</code> (or as
<code>Timestamp</code> if it fits the <code>timestampFormat</code>).</td>
+ <td>read</td>
+ </tr>
<tr>
<td><code>enforceSchema</code></td>
<td>true</td>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 8b0c6c49b85..3132fea8700 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -46,6 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
isParsing = true,
forTimestampNTZ = true)
+ private lazy val dateFormatter = DateFormatter(
+ options.dateFormatInRead,
+ options.locale,
+ legacyFormat = FAST_DATE_FORMAT,
+ isParsing = true)
+
private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
s: String => new java.math.BigDecimal(s)
@@ -117,7 +123,10 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
+ case DateType => tryParseDateTime(field)
+ case TimestampNTZType if options.inferDate => tryParseDateTime(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
+ case TimestampType if options.inferDate => tryParseDateTime(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
@@ -169,6 +178,16 @@ class CSVInferSchema(val options: CSVOptions) extends
Serializable {
private def tryParseDouble(field: String): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
+ } else if (options.inferDate) {
+ tryParseDateTime(field)
+ } else {
+ tryParseTimestampNTZ(field)
+ }
+ }
+
+ private def tryParseDateTime(field: String): DataType = {
+ if ((allCatch opt dateFormatter.parse(field)).isDefined) {
+ DateType
} else {
tryParseTimestampNTZ(field)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 3e92c3d25eb..a033e3a3a8d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -148,7 +148,28 @@ class CSVOptions(
// A language tag in IETF BCP 47 format
val locale: Locale =
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
- val dateFormatInRead: Option[String] = parameters.get("dateFormat")
+ /**
+ * Infer columns with all valid date entries as date type (otherwise
inferred as timestamp type).
+ * Disabled by default for backwards compatibility and performance. When
enabled, date entries in
+ * timestamp columns will be cast to timestamp upon parsing. Not compatible
with
+ * legacyTimeParserPolicy == LEGACY since legacy date parser will accept
extra trailing characters
+ */
+ val inferDate = {
+ val inferDateFlag = getBool("inferDate")
+ if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY &&
inferDateFlag) {
+ throw QueryExecutionErrors.inferDateWithLegacyTimeParserError()
+ }
+ inferDateFlag
+ }
+
+ // Provide a default value for dateFormatInRead when inferDate. This ensures
that the
+ // Iso8601DateFormatter (with strict date parsing) is used for date inference
+ val dateFormatInRead: Option[String] =
+ if (inferDate) {
+ Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern))
+ } else {
+ parameters.get("dateFormat")
+ }
val dateFormatInWrite: String = parameters.getOrElse("dateFormat",
DateFormatter.defaultPattern)
val timestampFormatInRead: Option[String] =
@@ -195,7 +216,6 @@ class CSVOptions(
*/
val enforceSchema = getBool("enforceSchema", default = true)
-
/**
* String representation of an empty value in read and in write.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 56ebfcc26c6..0237b6c454d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils,
GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros,
TimeZoneUTC}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -197,34 +198,46 @@ class UnivocityParser(
Decimal(decimalParser(datum), dt.precision, dt.scale)
}
- case _: TimestampType => (d: String) =>
+ case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
try {
- timestampFormatter.parse(datum)
+ dateFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for
backwards
// compatibility.
val str =
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
- DateTimeUtils.stringToTimestamp(str,
options.zoneId).getOrElse(throw e)
+ DateTimeUtils.stringToDate(str).getOrElse(throw e)
}
}
- case _: TimestampNTZType => (d: String) =>
- nullSafeDatum(d, name, nullable, options) { datum =>
- timestampNTZFormatter.parseWithoutTimeZone(datum, false)
- }
-
- case _: DateType => (d: String) =>
+ case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
try {
- dateFormatter.parse(datum)
+ timestampFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for
backwards
// compatibility.
val str =
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
- DateTimeUtils.stringToDate(str).getOrElse(throw e)
+ DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse {
+ // There may be date type entries in timestamp column due to
schema inference
+ if (options.inferDate) {
+ daysToMicros(dateFormatter.parse(datum), options.zoneId)
+ } else {
+ throw(e)
+ }
+ }
+ }
+ }
+
+ case _: TimestampNTZType => (d: String) =>
+ nullSafeDatum(d, name, nullable, options) { datum =>
+ try {
+ timestampNTZFormatter.parseWithoutTimeZone(datum, false)
+ } catch {
+ case NonFatal(e) if (options.inferDate) =>
+ daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 64e6283c0e3..1ef31673d6a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException,
FileStatus, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.codehaus.commons.compiler.{CompileException,
InternalCompilerException}
-import org.apache.spark.{Partition, SparkArithmeticException,
SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException,
SparkConcurrentModificationException, SparkDateTimeException, SparkException,
SparkFileAlreadyExistsException, SparkFileNotFoundException,
SparkIllegalArgumentException, SparkIndexOutOfBoundsException,
SparkNoSuchElementException, SparkNoSuchMethodException,
SparkNumberFormatException, SparkRuntimeException, SparkSecurityException,
SparkSQLException, SparkSQLFea [...]
+import org.apache.spark.{Partition, SparkArithmeticException,
SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException,
SparkConcurrentModificationException, SparkDateTimeException, SparkException,
SparkFileAlreadyExistsException, SparkFileNotFoundException,
SparkIllegalArgumentException, SparkIndexOutOfBoundsException,
SparkNoSuchElementException, SparkNoSuchMethodException,
SparkNumberFormatException, SparkRuntimeException, SparkSecurityException,
SparkSQLException, SparkSQLFea [...]
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
@@ -528,6 +528,12 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
""".stripMargin)
}
+ def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = {
+ new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE",
+ messageParameters = Array()
+ )
+ }
+
def streamedOperatorUnsupportedByDataSourceError(
className: String, operator: String): Throwable = {
new UnsupportedOperationException(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index d268f8c2e72..8790223a680 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -109,6 +109,12 @@ class CSVInferSchemaSuite extends SparkFunSuite with
SQLHelper {
assert(
inferSchema.mergeRowTypes(Array(DoubleType),
Array(LongType)).sameElements(Array(DoubleType)))
+ assert(
+ inferSchema.mergeRowTypes(Array(DateType),
+ Array(TimestampNTZType)).sameElements(Array(TimestampNTZType)))
+ assert(
+ inferSchema.mergeRowTypes(Array(DateType),
+ Array(TimestampType)).sameElements(Array(TimestampType)))
}
test("Null fields are handled properly when a nullValue is specified") {
@@ -192,4 +198,53 @@ class CSVInferSchemaSuite extends SparkFunSuite with
SQLHelper {
Seq("en-US").foreach(checkDecimalInfer(_, StringType))
Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7,
0)))
}
+
+ test("SPARK-39469: inferring date type") {
+ // "yyyy/MM/dd" format
+ var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "inferDate"
-> "true"),
+ false, "UTC")
+ var inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "2018/12/02") == DateType)
+ // "MMM yyyy" format
+ options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "inferDate" ->
"true"),
+ false, "GMT")
+ inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "Dec 2018") == DateType)
+ // Field should strictly match date format to infer as date
+ options = new CSVOptions(
+ Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" ->
"yyyy-MM-dd'T'HH:mm:ss",
+ "inferDate" -> "true"),
+ columnPruning = false,
+ defaultTimeZoneId = "GMT")
+ inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") ==
TimestampType)
+ assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
+ }
+
+ test("SPARK-39469: inferring date and timestamp types in a mixed column with
inferDate=true") {
+ var options = new CSVOptions(
+ Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd",
+ "timestampNTZFormat" -> "yyyy/MM/dd", "inferDate" -> "true"),
+ columnPruning = false,
+ defaultTimeZoneId = "UTC")
+ var inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
+ assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType)
+ // SQL configuration must be set to default to TimestampNTZ
+ withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
+ assert(inferSchema.inferField(DateType, "2003/02/05") ==
TimestampNTZType)
+ }
+
+ // inferField should upgrade a date field to timestamp if the typeSoFar is
a timestamp
+ assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") ==
TimestampNTZType)
+ assert(inferSchema.inferField(TimestampType, "2018_12_03") ==
TimestampType)
+
+ // No errors when Date and Timestamp have the same format. Inference
defaults to date
+ options = new CSVOptions(
+ Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy_MM_dd"),
+ columnPruning = false,
+ defaultTimeZoneId = "UTC")
+ inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
index 4166401d040..2589376bc3d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv
import java.math.BigDecimal
import java.text.{DecimalFormat, DecimalFormatSymbols}
+import java.time.{ZoneOffset}
import java.util.{Locale, TimeZone}
import org.apache.commons.lang3.time.FastDateFormat
@@ -358,4 +359,26 @@ class UnivocityParserSuite extends SparkFunSuite with
SQLHelper {
Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false,
"UTC")
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
}
+
+ test("SPARK-39469: dates should be parsed correctly in a timestamp column
when inferDate=true") {
+ def checkDate(dataType: DataType): Unit = {
+ val timestampsOptions =
+ new CSVOptions(Map("inferDate" -> "true", "timestampFormat" ->
"dd/MM/yyyy HH:mm",
+ "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" ->
"dd_MM_yyyy"),
+ false, DateTimeUtils.getZoneId("-08:00").toString)
+ // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in
TimestampNTZ column are always
+ // converted to their equivalent UTC timestamp
+ val dateString = "08_09_2001"
+ val expected = dataType match {
+ case TimestampType => date(2001, 9, 8, 0, 0, 0, 0,
ZoneOffset.of("-08:00"))
+ case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC)
+ case DateType => days(2001, 9, 8)
+ }
+ val parser = new UnivocityParser(new StructType(), timestampsOptions)
+ assert(parser.makeConverter("d", dataType).apply(dateString) == expected)
+ }
+ checkDate(TimestampType)
+ checkDate(TimestampNTZType)
+ checkDate(DateType)
+ }
}
diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv
b/sql/core/src/test/resources/test-data/date-infer-schema.csv
new file mode 100644
index 00000000000..ca16ec81e6d
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv
@@ -0,0 +1,4 @@
+date,timestamp-date,date-timestamp
+2001-09-08,2014-10-27T18:30:00,1765-03-28
+1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00
+0293-11-07,1995-06-25,2016-01-28T20:00:00
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index bf92ffcf465..758f5430608 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.{AnalysisException, Column,
DataFrame, Encoders, Que
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -74,6 +75,7 @@ abstract class CSVSuite
private val simpleSparseFile = "test-data/simple_sparse.csv"
private val numbersFile = "test-data/numbers.csv"
private val datesFile = "test-data/dates.csv"
+ private val dateInferSchemaFile = "test-data/date-infer-schema.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"
private val badAfterGoodFile = "test-data/bad_after_good.csv"
@@ -2788,6 +2790,56 @@ abstract class CSVSuite
}
}
}
+
+ test("SPARK-39469: Infer schema for date type") {
+ val options1 = Map(
+ "header" -> "true",
+ "inferSchema" -> "true",
+ "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
+ "dateFormat" -> "yyyy-MM-dd",
+ "inferDate" -> "true")
+ val options2 = Map(
+ "header" -> "true",
+ "inferSchema" -> "true",
+ "inferDate" -> "true")
+
+ // Error should be thrown when attempting to inferDate with Legacy parser
+ if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+ val msg = intercept[IllegalArgumentException] {
+ spark.read
+ .format("csv")
+ .options(options1)
+ .load(testFile(dateInferSchemaFile))
+ }.getMessage
+ assert(msg.contains("CANNOT_INFER_DATE"))
+ } else {
+ // 1. Specify date format and timestamp format
+ // 2. Date inference should work with default date format when
dateFormat is not provided
+ Seq(options1, options2).foreach {options =>
+ val results = spark.read
+ .format("csv")
+ .options(options)
+ .load(testFile(dateInferSchemaFile))
+
+ val expectedSchema = StructType(List(StructField("date", DateType),
+ StructField("timestamp-date", TimestampType),
+ StructField("date-timestamp", TimestampType)))
+ assert(results.schema == expectedSchema)
+
+ val expected =
+ Seq(
+ Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27
18:30:0.0"),
+ Timestamp.valueOf("1765-03-28 00:00:0.0")),
+ Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14
01:01:0.0"),
+ Timestamp.valueOf("1423-11-12 23:41:0.0")),
+ Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25
00:00:00.0"),
+ Timestamp.valueOf("2016-01-28 20:00:00.0"))
+ )
+ assert(results.collect().toSeq.map(_.toSeq) == expected)
+ }
+
+ }
+ }
}
class CSVv1Suite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]