This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 93cf0df6d9e [SPARK-39849][SQL] Dataset.as(StructType) fills missing
new columns with null value
93cf0df6d9e is described below
commit 93cf0df6d9eaaa899c4fa2d8756ad16c841e12fb
Author: Cheng Su <[email protected]>
AuthorDate: Fri Jul 29 19:57:16 2022 +0800
[SPARK-39849][SQL] Dataset.as(StructType) fills missing new columns with
null value
### What changes were proposed in this pull request?
As a followup of
https://github.com/apache/spark/pull/37011#discussion_r917700960 , it would be
great to fill missing new columns with null values, instead of failing out
loud. Note it would only work for nullable columns.
### Why are the changes needed?
Better user experience when using `.as()` to cast with extra new columns,
or new fields in existing struct type
### Does this PR introduce _any_ user-facing change?
Yes, the exact described behavior change of the `.as()` API.
### How was this patch tested?
Added unit test in
[DataFrameAsSchemaSuite.scala](https://github.com/apache/spark/compare/master...c21:spark:default-null?expand=1#diff-115433d6399124014f2600556a5f6ced9b79b64e4584c19d51997c3eff9e6f33)
Closes #37264 from c21/default-null.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../plans/logical/basicLogicalOperators.scala | 65 ++++++++++++++--------
.../apache/spark/sql/DataFrameToSchemaSuite.scala | 23 ++++++--
2 files changed, 60 insertions(+), 28 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 22134a06288..ef5f87b23ec 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -172,39 +172,56 @@ object Project {
expected.map { f =>
val matched = fields.filter(field => conf.resolver(field._1, f.name))
if (matched.isEmpty) {
- if (columnPath.isEmpty) {
- throw QueryCompilationErrors.unresolvedColumnError(f.name,
fields.map(_._1))
+ if (f.nullable) {
+ val columnExpr = Literal.create(null, f.dataType)
+ // Fill nullable missing new column with null value.
+ createNewColumn(columnExpr, f.name, f.metadata, Metadata.empty)
} else {
- throw QueryCompilationErrors.unresolvedFieldError(f.name,
columnPath, fields.map(_._1))
+ if (columnPath.isEmpty) {
+ throw QueryCompilationErrors.unresolvedColumnError(f.name,
fields.map(_._1))
+ } else {
+ throw QueryCompilationErrors.unresolvedFieldError(f.name,
columnPath, fields.map(_._1))
+ }
}
} else if (matched.length > 1) {
throw QueryCompilationErrors.ambiguousColumnOrFieldError(
columnPath :+ f.name, matched.length)
- }
+ } else {
+ val columnExpr = matched.head._2
+ val originalMetadata = columnExpr match {
+ case ne: NamedExpression => ne.metadata
+ case g: GetStructField => g.childSchema(g.ordinal).metadata
+ case _ => Metadata.empty
+ }
- val columnExpr = matched.head._2
- val originalMetadata = columnExpr match {
- case ne: NamedExpression => ne.metadata
- case g: GetStructField => g.childSchema(g.ordinal).metadata
- case _ => Metadata.empty
- }
- val newMetadata = new MetadataBuilder()
- .withMetadata(originalMetadata)
- .withMetadata(f.metadata)
- .build()
-
- val newColumnPath = columnPath :+ matched.head._1
- reconcileColumnType(columnExpr, newColumnPath, f.dataType, f.nullable,
conf) match {
- case a: Attribute => a.withName(f.name).withMetadata(newMetadata)
- case other =>
- if (newMetadata == Metadata.empty) {
- Alias(other, f.name)()
- } else {
- Alias(other, f.name)(explicitMetadata = Some(newMetadata))
- }
+ val newColumnPath = columnPath :+ matched.head._1
+ val newColumnExpr = reconcileColumnType(
+ columnExpr, newColumnPath, f.dataType, f.nullable, conf)
+ createNewColumn(newColumnExpr, f.name, f.metadata, originalMetadata)
}
}
}
+
+ private def createNewColumn(
+ col: Expression,
+ name: String,
+ newMetadata: Metadata,
+ originalMetadata: Metadata): NamedExpression = {
+ val metadata = new MetadataBuilder()
+ .withMetadata(originalMetadata)
+ .withMetadata(newMetadata)
+ .build()
+
+ col match {
+ case a: Attribute => a.withName(name).withMetadata(metadata)
+ case other =>
+ if (metadata == Metadata.empty) {
+ Alias(other, name)()
+ } else {
+ Alias(other, name)(explicitMetadata = Some(metadata))
+ }
+ }
+ }
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
index 26ddbc4569e..b61b83896e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
@@ -46,8 +46,15 @@ class DataFrameToSchemaSuite extends QueryTest with
SharedSparkSession {
checkAnswer(df, Row("b"))
}
- test("negative: column not found") {
- val schema = new StructType().add("non_exist", StringType)
+ test("nullable column with default null value") {
+ val schema = new StructType().add("non_exist", StringType).add("j",
StringType)
+ val df = Seq("a" -> "b").toDF("i", "j").to(schema)
+ assert(df.schema == schema)
+ checkAnswer(df, Row(null, "b"))
+ }
+
+ test("negative: non-nullable column not found") {
+ val schema = new StructType().add("non_exist", StringType, nullable =
false)
val e = intercept[SparkThrowable](Seq("a" -> "b").toDF("i",
"j").to(schema))
checkError(
exception = e,
@@ -137,8 +144,16 @@ class DataFrameToSchemaSuite extends QueryTest with
SharedSparkSession {
checkAnswer(df, Row(Row("b", "a")))
}
- test("negative: field not found") {
- val innerFields = new StructType().add("non_exist", StringType)
+ test("nullable field with default null value") {
+ val innerFields = new StructType().add("J", StringType).add("non_exist",
StringType)
+ val schema = new StructType().add("struct", innerFields, nullable = false)
+ val df = Seq("a" -> "b").toDF("i", "j").select(struct($"i",
$"j").as("struct")).to(schema)
+ assert(df.schema == schema)
+ checkAnswer(df, Row(Row("b", null)))
+ }
+
+ test("negative: non-nullable field not found") {
+ val innerFields = new StructType().add("non_exist", StringType, nullable =
false)
val schema = new StructType().add("struct", innerFields, nullable = false)
val e = intercept[SparkThrowable] {
Seq("a" -> "b").toDF("i", "j").select(struct($"i",
$"j").as("struct")).to(schema)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]