This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 0f2e3ecb994 [SPARK-35912][SQL][FOLLOW-UP] Add a legacy configuration 
for respecting nullability in DataFrame.schema.csv/json(ds)
0f2e3ecb994 is described below

commit 0f2e3ecb9943aec91204c168b6402f3e5de53ca2
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu May 5 16:23:28 2022 +0900

    [SPARK-35912][SQL][FOLLOW-UP] Add a legacy configuration for respecting 
nullability in DataFrame.schema.csv/json(ds)
    
    ### What changes were proposed in this pull request?
    
    This PR is a followup of https://github.com/apache/spark/pull/33436, that 
adds a legacy configuration. It's found that it can break a valid usacase 
(https://github.com/apache/spark/pull/33436/files#r863271189):
    
    ```scala
    import org.apache.spark.sql.types._
    val ds = Seq("a,", "a,b").toDS
    spark.read.schema(
      StructType(
        StructField("f1", StringType, nullable = false) ::
        StructField("f2", StringType, nullable = false) :: Nil)
      ).option("mode", "DROPMALFORMED").csv(ds).show()
    ```
    
    **Before:**
    
    ```
    +---+---+
    | f1| f2|
    +---+---+
    |  a|  b|
    +---+---+
    ```
    
    **After:**
    
    ```
    +---+----+
    | f1|  f2|
    +---+----+
    |  a|null|
    |  a|   b|
    +---+----+
    ```
    
    This PR adds a configuration to restore **Before** behaviour.
    
    ### Why are the changes needed?
    
    To avoid breakage of valid usecases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it adds a new configuration 
`spark.sql.legacy.respectNullabilityInTextDatasetConversion` (`false` by 
default) to respect the nullability in 
`DataFrameReader.schema(schema).csv(dataset)` and 
`DataFrameReader.schema(schema).json(dataset)` when the user-specified schema 
is provided.
    
    ### How was this patch tested?
    
    Unittests were added.
    
    Closes #36435 from HyukjinKwon/SPARK-35912.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 6689b97ec76abe5bab27f02869f8f16b32530d1a)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 docs/sql-migration-guide.md                                |  2 +-
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++
 .../main/scala/org/apache/spark/sql/DataFrameReader.scala  | 13 +++++++++++--
 .../spark/sql/execution/datasources/csv/CSVSuite.scala     | 10 ++++++++++
 .../spark/sql/execution/datasources/json/JsonSuite.scala   | 14 +++++++++++++-
 5 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index b6bfb0ed2be..a7757d6c9a0 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -30,7 +30,7 @@ license: |
 
   - Since Spark 3.3, the functions `lpad` and `rpad` have been overloaded to 
support byte sequences. When the first argument is a byte sequence, the 
optional padding pattern must also be a byte sequence and the result is a 
BINARY value. The default padding pattern in this case is the zero byte. To 
restore the legacy behavior of always returning string types, set 
`spark.sql.legacy.lpadRpadAlwaysReturnString` to `true`.
 
-  - Since Spark 3.3, Spark turns a non-nullable schema into nullable for API 
`DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` 
and `DataFrameReader.schema(schema: StructType).csv(csvDataset: 
Dataset[String])` when the schema is specified by the user and contains 
non-nullable fields.
+  - Since Spark 3.3, Spark turns a non-nullable schema into nullable for API 
`DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` 
and `DataFrameReader.schema(schema: StructType).csv(csvDataset: 
Dataset[String])` when the schema is specified by the user and contains 
non-nullable fields. To restore the legacy behavior of respecting the 
nullability, set `spark.sql.legacy.respectNullabilityInTextDatasetConversion` 
to `true`.
 
   - Since Spark 3.3, when the date or timestamp pattern is not specified, 
Spark converts an input string to a date/timestamp using the `CAST` expression 
approach. The changes affect CSV/JSON datasources and parsing of partition 
values. In Spark 3.2 or earlier, when the date or timestamp pattern is not set, 
Spark uses the default patterns: `yyyy-MM-dd` for dates and `yyyy-MM-dd 
HH:mm:ss` for timestamps. After the changes, Spark still recognizes the pattern 
together with
     
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 76f3d1f5a84..b6230f71383 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2983,6 +2983,17 @@ object SQLConf {
     .intConf
     .createOptional
 
+  val LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION =
+    buildConf("spark.sql.legacy.respectNullabilityInTextDatasetConversion")
+      .internal()
+      .doc("When true, the nullability in the user-specified schema for " +
+        "`DataFrameReader.schema(schema).json(jsonDataset)` and " +
+        "`DataFrameReader.schema(schema).csv(csvDataset)` is respected. 
Otherwise, they are " +
+        "turned to a nullable schema forcibly.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val REPL_EAGER_EVAL_ENABLED = buildConf("spark.sql.repl.eagerEval.enabled")
     .doc("Enables eager evaluation or not. When true, the top K rows of 
Dataset will be " +
       "displayed if and only if the REPL supports the eager evaluation. 
Currently, the " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index cab0ea2b30a..344f40eef45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.csv._
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
@@ -406,7 +407,11 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 
     userSpecifiedSchema.foreach(ExprUtils.checkJsonSchema(_).foreach(throw _))
-    val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
+    val schema = userSpecifiedSchema.map {
+      case s if !SQLConf.get.getConf(
+        SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => 
s.asNullable
+      case other => other
+    }.getOrElse {
       TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
     }
 
@@ -478,7 +483,11 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         None
       }
 
-    val schema = userSpecifiedSchema.map(_.asNullable).getOrElse {
+    val schema = userSpecifiedSchema.map {
+      case s if !SQLConf.get.getConf(
+        SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => 
s.asNullable
+      case other => other
+    }.getOrElse {
       TextInputCSVDataSource.inferFromDataset(
         sparkSession,
         csvDataset,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 7cbe6ed9fce..dd42f48d716 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2693,6 +2693,16 @@ abstract class CSVSuite
       assert(df.schema == expected)
       checkAnswer(df, Row(1, null) :: Nil)
     }
+
+    
withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key 
-> "true") {
+      checkAnswer(
+        spark.read.schema(
+          StructType(
+            StructField("f1", StringType, nullable = false) ::
+            StructField("f2", StringType, nullable = false) :: Nil)
+        ).option("mode", "DROPMALFORMED").csv(Seq("a,", "a,b").toDS),
+        Row("a", "b"))
+    }
   }
 
   test("SPARK-36536: use casting when datetime pattern is not set") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0897ad2ff30..bc7c6e56ece 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -3165,7 +3165,7 @@ abstract class JsonSuite
     Seq(missingFieldInput, nullValueInput).foreach { jsonString =>
       Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode =>
         val json = spark.createDataset(
-          spark.sparkContext.parallelize(jsonString:: Nil))(Encoders.STRING)
+          spark.sparkContext.parallelize(jsonString :: Nil))(Encoders.STRING)
         val df = spark.read
           .option("mode", mode)
           .schema(schema)
@@ -3174,6 +3174,18 @@ abstract class JsonSuite
         checkAnswer(df, Row(1, null) :: Nil)
       }
     }
+
+    
withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key 
-> "true") {
+      checkAnswer(
+        spark.read.schema(
+          StructType(
+            StructField("f1", LongType, nullable = false) ::
+            StructField("f2", LongType, nullable = false) :: Nil)
+        ).option("mode", "DROPMALFORMED").json(Seq("""{"f1": 1}""").toDS),
+        // It is for testing legacy configuration. This is technically a bug as
+        // `0` has to be `null` but the schema is non-nullable.
+        Row(1, 0))
+    }
   }
 
   test("SPARK-36379: proceed parsing with root nulls in permissive mode") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to