This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 630aa0bf87a [SPARK-40215][SQL] Add SQL configs to control CSV/JSON 
date and timestamp parsing behavior
630aa0bf87a is described below

commit 630aa0bf87aee1ccea181532328e3ed25e73d63e
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Fri Aug 26 10:23:04 2022 +0300

    [SPARK-40215][SQL] Add SQL configs to control CSV/JSON date and timestamp 
parsing behavior
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up for 
[SPARK-39731](https://issues.apache.org/jira/browse/SPARK-39731) and PR 
https://github.com/apache/spark/pull/37147.
    
    I found that it could be problematic to change 
`spark.sql.legacy.timeParserPolicy` to LEGACY when inferring dates and 
timestamps in CSV and JSON. Sometimes it is beneficial to have the time parser 
policy as CORRECTED but still use a more lenient date and timestamp inference 
(or when migrating to a newer Spark version).
    
    I added two separate configs that control this behavior:
    - `spark.sql.legacy.csv.enableDateTimeParsingFallback`
    - `spark.sql.legacy.json.enableDateTimeParsingFallback`
    
    When the configs are set to `true`, the legacy time parsing behaviour is 
enabled (pre Spark 3.0).
    
    With this PR, the precedence order is as follows for CSV (similar for JSON):
    - data source option `enableDateTimeParsingFallback`
    - if that is not set, check 
`spark.sql.legacy.{csv,json}.enableDateTimeParsingFallback`
    - if that is not set, check `spark.sql.legacy.timeParserPolicy` and whether 
or not a custom format is used.
    
    ### Why are the changes needed?
    
    The change makes it easier for users to migrate to a newer Spark version 
without changing global config `spark.sql.legacy.timeParserPolicy`. Also, 
allows to enable legacy parsing for CSV and JSON separately without changing 
the code or the global time parser config.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, simply adds an ability to change the behaviour specifically for CSV or 
JSON.
    
    ### How was this patch tested?
    
    I added a unit test for CSV and JSON to verify the flag.
    
    Closes #37653 from sadikovi/SPARK-40215.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 20 ++++++++------
 .../spark/sql/catalyst/json/JacksonParser.scala    | 20 ++++++++------
 .../org/apache/spark/sql/internal/SQLConf.scala    | 22 +++++++++++++++
 .../sql/execution/datasources/csv/CSVSuite.scala   | 32 ++++++++++++++++++++++
 .../sql/execution/datasources/json/JsonSuite.scala | 32 ++++++++++++++++++++++
 5 files changed, 110 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index c9955d72524..9d855d1a93d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -124,15 +124,19 @@ class UnivocityParser(
   // dates and timestamps.
   // For more information, see comments for "enableDateTimeParsingFallback" 
option in CSVOptions.
   private val enableParsingFallbackForTimestampType =
-    options.enableDateTimeParsingFallback.getOrElse {
-      SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
-        options.timestampFormatInRead.isEmpty
-    }
+    options.enableDateTimeParsingFallback
+      .orElse(SQLConf.get.csvEnableDateTimeParsingFallback)
+      .getOrElse {
+        SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
+          options.timestampFormatInRead.isEmpty
+      }
   private val enableParsingFallbackForDateType =
-    options.enableDateTimeParsingFallback.getOrElse {
-      SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
-        options.dateFormatInRead.isEmpty
-    }
+    options.enableDateTimeParsingFallback
+      .orElse(SQLConf.get.csvEnableDateTimeParsingFallback)
+      .getOrElse {
+        SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
+          options.dateFormatInRead.isEmpty
+      }
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 06133d44c13..f8adac1ee44 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -82,15 +82,19 @@ class JacksonParser(
   // dates and timestamps.
   // For more information, see comments for "enableDateTimeParsingFallback" 
option in JSONOptions.
   private val enableParsingFallbackForTimestampType =
-    options.enableDateTimeParsingFallback.getOrElse {
-      SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
-        options.timestampFormatInRead.isEmpty
-    }
+    options.enableDateTimeParsingFallback
+      .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
+      .getOrElse {
+        SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
+          options.timestampFormatInRead.isEmpty
+      }
   private val enableParsingFallbackForDateType =
-    options.enableDateTimeParsingFallback.getOrElse {
-      SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
-        options.dateFormatInRead.isEmpty
-    }
+    options.enableDateTimeParsingFallback
+      .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
+      .getOrElse {
+        SQLConf.get.legacyTimeParserPolicy == 
SQLConf.LegacyBehaviorPolicy.LEGACY ||
+          options.dateFormatInRead.isEmpty
+      }
 
   /**
    * Create a converter which converts the JSON documents held by the 
`JsonParser`
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 31bdbca4a25..eb7a6a9105e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3520,6 +3520,22 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK =
+    buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback")
+      .internal()
+      .doc("When true, enable legacy date/time parsing fallback in CSV")
+      .version("3.4.0")
+      .booleanConf
+      .createOptional
+
+  val LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK =
+    buildConf("spark.sql.legacy.json.enableDateTimeParsingFallback")
+      .internal()
+      .doc("When true, enable legacy date/time parsing fallback in JSON")
+      .version("3.4.0")
+      .booleanConf
+      .createOptional
+
   val ADD_PARTITION_BATCH_SIZE =
     buildConf("spark.sql.addPartitionInBatch.size")
       .internal()
@@ -4621,6 +4637,12 @@ class SQLConf extends Serializable with Logging {
 
   def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED)
 
+  def jsonEnableDateTimeParsingFallback: Option[Boolean] =
+    getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK)
+
+  def csvEnableDateTimeParsingFallback: Option[Boolean] =
+    getConf(LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK)
+
   def integerGroupingIdEnabled: Boolean = 
getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)
 
   def metadataCacheTTL: Long = 
getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 0068f57a769..5c97821f11e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2949,6 +2949,38 @@ abstract class CSVSuite
       )
     }
   }
+
+  test("SPARK-40215: enable parsing fallback for CSV in CORRECTED mode with a 
SQL config") {
+    withTempPath { path =>
+      Seq("2020-01-01,2020-01-01").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      for (fallbackEnabled <- Seq(true, false)) {
+        withSQLConf(
+            SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED",
+            SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> 
s"$fallbackEnabled") {
+          val df = spark.read
+            .schema("date date, ts timestamp")
+            .option("dateFormat", "invalid")
+            .option("timestampFormat", "invalid")
+            .csv(path.getAbsolutePath)
+
+          if (fallbackEnabled) {
+            checkAnswer(
+              df,
+              Seq(Row(Date.valueOf("2020-01-01"), 
Timestamp.valueOf("2020-01-01 00:00:00")))
+            )
+          } else {
+            checkAnswer(
+              df,
+              Seq(Row(null, null))
+            )
+          }
+        }
+      }
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 02225d40c83..f0801ae313e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -3322,6 +3322,38 @@ abstract class JsonSuite
       )
     }
   }
+
+  test("SPARK-40215: enable parsing fallback for JSON in CORRECTED mode with a 
SQL config") {
+    withTempPath { path =>
+      Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF()
+        .repartition(1)
+        .write.text(path.getAbsolutePath)
+
+      for (fallbackEnabled <- Seq(true, false)) {
+        withSQLConf(
+            SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED",
+            SQLConf.LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> 
s"$fallbackEnabled") {
+          val df = spark.read
+            .schema("date date, ts timestamp")
+            .option("dateFormat", "invalid")
+            .option("timestampFormat", "invalid")
+            .json(path.getAbsolutePath)
+
+          if (fallbackEnabled) {
+            checkAnswer(
+              df,
+              Seq(Row(Date.valueOf("2020-01-01"), 
Timestamp.valueOf("2020-01-01 00:00:00")))
+            )
+          } else {
+            checkAnswer(
+              df,
+              Seq(Row(null, null))
+            )
+          }
+        }
+      }
+    }
+  }
 }
 
 class JsonV1Suite extends JsonSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to