This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 93cf0df6d9e [SPARK-39849][SQL] Dataset.as(StructType) fills missing 
new columns with null value
93cf0df6d9e is described below

commit 93cf0df6d9eaaa899c4fa2d8756ad16c841e12fb
Author: Cheng Su <[email protected]>
AuthorDate: Fri Jul 29 19:57:16 2022 +0800

    [SPARK-39849][SQL] Dataset.as(StructType) fills missing new columns with 
null value
    
    ### What changes were proposed in this pull request?
    
    As a followup of 
https://github.com/apache/spark/pull/37011#discussion_r917700960 , it would be 
great to fill missing new columns with null values, instead of failing out 
loud. Note it would only work for nullable columns.
    
    ### Why are the changes needed?
    
    Better user experience when using `.as()` to cast with extra new columns, 
or new fields in existing struct type
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the exact described behavior change of the `.as()` API.
    
    ### How was this patch tested?
    
    Added unit test in 
[DataFrameAsSchemaSuite.scala](https://github.com/apache/spark/compare/master...c21:spark:default-null?expand=1#diff-115433d6399124014f2600556a5f6ced9b79b64e4584c19d51997c3eff9e6f33)
    
    Closes #37264 from c21/default-null.
    
    Authored-by: Cheng Su <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../plans/logical/basicLogicalOperators.scala      | 65 ++++++++++++++--------
 .../apache/spark/sql/DataFrameToSchemaSuite.scala  | 23 ++++++--
 2 files changed, 60 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 22134a06288..ef5f87b23ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -172,39 +172,56 @@ object Project {
     expected.map { f =>
       val matched = fields.filter(field => conf.resolver(field._1, f.name))
       if (matched.isEmpty) {
-        if (columnPath.isEmpty) {
-          throw QueryCompilationErrors.unresolvedColumnError(f.name, 
fields.map(_._1))
+        if (f.nullable) {
+          val columnExpr = Literal.create(null, f.dataType)
+          // Fill nullable missing new column with null value.
+          createNewColumn(columnExpr, f.name, f.metadata, Metadata.empty)
         } else {
-          throw QueryCompilationErrors.unresolvedFieldError(f.name, 
columnPath, fields.map(_._1))
+          if (columnPath.isEmpty) {
+            throw QueryCompilationErrors.unresolvedColumnError(f.name, 
fields.map(_._1))
+          } else {
+            throw QueryCompilationErrors.unresolvedFieldError(f.name, 
columnPath, fields.map(_._1))
+          }
         }
       } else if (matched.length > 1) {
         throw QueryCompilationErrors.ambiguousColumnOrFieldError(
           columnPath :+ f.name, matched.length)
-      }
+      } else {
+        val columnExpr = matched.head._2
+        val originalMetadata = columnExpr match {
+          case ne: NamedExpression => ne.metadata
+          case g: GetStructField => g.childSchema(g.ordinal).metadata
+          case _ => Metadata.empty
+        }
 
-      val columnExpr = matched.head._2
-      val originalMetadata = columnExpr match {
-        case ne: NamedExpression => ne.metadata
-        case g: GetStructField => g.childSchema(g.ordinal).metadata
-        case _ => Metadata.empty
-      }
-      val newMetadata = new MetadataBuilder()
-        .withMetadata(originalMetadata)
-        .withMetadata(f.metadata)
-        .build()
-
-      val newColumnPath = columnPath :+ matched.head._1
-      reconcileColumnType(columnExpr, newColumnPath, f.dataType, f.nullable, 
conf) match {
-        case a: Attribute => a.withName(f.name).withMetadata(newMetadata)
-        case other =>
-          if (newMetadata == Metadata.empty) {
-            Alias(other, f.name)()
-          } else {
-            Alias(other, f.name)(explicitMetadata = Some(newMetadata))
-          }
+        val newColumnPath = columnPath :+ matched.head._1
+        val newColumnExpr = reconcileColumnType(
+          columnExpr, newColumnPath, f.dataType, f.nullable, conf)
+        createNewColumn(newColumnExpr, f.name, f.metadata, originalMetadata)
       }
     }
   }
+
+  private def createNewColumn(
+      col: Expression,
+      name: String,
+      newMetadata: Metadata,
+      originalMetadata: Metadata): NamedExpression = {
+    val metadata = new MetadataBuilder()
+      .withMetadata(originalMetadata)
+      .withMetadata(newMetadata)
+      .build()
+
+    col match {
+      case a: Attribute => a.withName(name).withMetadata(metadata)
+      case other =>
+        if (metadata == Metadata.empty) {
+          Alias(other, name)()
+        } else {
+          Alias(other, name)(explicitMetadata = Some(metadata))
+        }
+    }
+  }
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
index 26ddbc4569e..b61b83896e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
@@ -46,8 +46,15 @@ class DataFrameToSchemaSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(df, Row("b"))
   }
 
-  test("negative: column not found") {
-    val schema = new StructType().add("non_exist", StringType)
+  test("nullable column with default null value") {
+    val schema = new StructType().add("non_exist", StringType).add("j", 
StringType)
+    val df = Seq("a" -> "b").toDF("i", "j").to(schema)
+    assert(df.schema == schema)
+    checkAnswer(df, Row(null, "b"))
+  }
+
+  test("negative: non-nullable column not found") {
+    val schema = new StructType().add("non_exist", StringType, nullable = 
false)
     val e = intercept[SparkThrowable](Seq("a" -> "b").toDF("i", 
"j").to(schema))
     checkError(
       exception = e,
@@ -137,8 +144,16 @@ class DataFrameToSchemaSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(df, Row(Row("b", "a")))
   }
 
-  test("negative: field not found") {
-    val innerFields = new StructType().add("non_exist", StringType)
+  test("nullable field with default null value") {
+    val innerFields = new StructType().add("J", StringType).add("non_exist", 
StringType)
+    val schema = new StructType().add("struct", innerFields, nullable = false)
+    val df = Seq("a" -> "b").toDF("i", "j").select(struct($"i", 
$"j").as("struct")).to(schema)
+    assert(df.schema == schema)
+    checkAnswer(df, Row(Row("b", null)))
+  }
+
+  test("negative: non-nullable field not found") {
+    val innerFields = new StructType().add("non_exist", StringType, nullable = 
false)
     val schema = new StructType().add("struct", innerFields, nullable = false)
     val e = intercept[SparkThrowable] {
       Seq("a" -> "b").toDF("i", "j").select(struct($"i", 
$"j").as("struct")).to(schema)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to