This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 95796ac [SPARK-31256][SQL] DataFrameNaFunctions.drop should work for
nested columns
95796ac is described below
commit 95796ac7dafcfc2a7b57c22f6822d4a5f4d8a1ae
Author: Terry Kim <[email protected]>
AuthorDate: Mon Apr 20 02:59:09 2020 +0000
[SPARK-31256][SQL] DataFrameNaFunctions.drop should work for nested columns
### What changes were proposed in this pull request?
#26700 removed the ability to drop a row whose nested column value is null.
For example, for the following `df`:
```
val schema = new StructType()
.add("c1", new StructType()
.add("c1-1", StringType)
.add("c1-2", StringType))
val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.show
+--------+
| c1|
+--------+
| [, a2]|
|[b1, b2]|
| null|
+--------+
```
In Spark 2.4.4,
```
df.na.drop("any", Seq("c1.c1-1")).show
+--------+
| c1|
+--------+
|[b1, b2]|
+--------+
```
In Spark 2.4.5 or Spark 3.0.0-preview2, if nested columns are specified,
they are ignored.
```
df.na.drop("any", Seq("c1.c1-1")).show
+--------+
| c1|
+--------+
| [, a2]|
|[b1, b2]|
| null|
+--------+
```
### Why are the changes needed?
This seems like a regression.
### Does this PR introduce any user-facing change?
Now, the nested column can be specified:
```
df.na.drop("any", Seq("c1.c1-1")).show
+--------+
| c1|
+--------+
|[b1, b2]|
+--------+
```
Also, if `*` is specified as a column, it will throw an `AnalysisException`
that `*` cannot be resolved, which was the behavior in 2.4.4. Currently, in
master, it has no effect.
### How was this patch tested?
Updated existing tests.
Closes #28266 from imback82/SPARK-31256.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d7499aed9cc943304e2ec89379d3651410f6ca90)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/DataFrameNaFunctions.scala | 12 +++---
.../spark/sql/DataFrameNaFunctionsSuite.scala | 48 ++++++++++++++--------
2 files changed, 35 insertions(+), 25 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index 953db80..bbf0ac1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -89,7 +89,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* @since 1.3.1
*/
def drop(how: String, cols: Seq[String]): DataFrame = {
- drop0(how, toAttributes(cols))
+ drop0(how, cols.map(df.resolve(_)))
}
/**
@@ -115,7 +115,7 @@ final class DataFrameNaFunctions private[sql](df:
DataFrame) {
* @since 1.3.1
*/
def drop(minNonNulls: Int, cols: Seq[String]): DataFrame = {
- drop0(minNonNulls, toAttributes(cols))
+ drop0(minNonNulls, cols.map(df.resolve(_)))
}
/**
@@ -480,7 +480,7 @@ final class DataFrameNaFunctions private[sql](df:
DataFrame) {
df.queryExecution.analyzed.output
}
- private def drop0(how: String, cols: Seq[Attribute]): DataFrame = {
+ private def drop0(how: String, cols: Seq[NamedExpression]): DataFrame = {
how.toLowerCase(Locale.ROOT) match {
case "any" => drop0(cols.size, cols)
case "all" => drop0(1, cols)
@@ -488,12 +488,10 @@ final class DataFrameNaFunctions private[sql](df:
DataFrame) {
}
}
- private def drop0(minNonNulls: Int, cols: Seq[Attribute]): DataFrame = {
+ private def drop0(minNonNulls: Int, cols: Seq[NamedExpression]): DataFrame =
{
// Filtering condition:
// only keep the row if it has at least `minNonNulls` non-null and non-NaN
values.
- val predicate = AtLeastNNonNulls(
- minNonNulls,
- outputAttributes.filter{ col => cols.exists(_.semanticEquals(col)) })
+ val predicate = AtLeastNNonNulls(minNonNulls, cols)
df.filter(Column(predicate))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
index fb1ca69..091877f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
@@ -45,6 +45,16 @@ class DataFrameNaFunctionsSuite extends QueryTest with
SharedSparkSession {
).toDF("int", "long", "short", "byte", "float", "double")
}
+ def createDFWithNestedColumns: DataFrame = {
+ val schema = new StructType()
+ .add("c1", new StructType()
+ .add("c1-1", StringType)
+ .add("c1-2", StringType))
+ val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
+ spark.createDataFrame(
+ spark.sparkContext.parallelize(data), schema)
+ }
+
test("drop") {
val input = createDF()
val rows = input.collect()
@@ -275,33 +285,35 @@ class DataFrameNaFunctionsSuite extends QueryTest with
SharedSparkSession {
assert(message.contains("Reference 'f2' is ambiguous"))
}
- test("fill/drop with col(*)") {
+ test("fill with col(*)") {
val df = createDF()
// If columns are specified with "*", they are ignored.
checkAnswer(df.na.fill("new name", Seq("*")), df.collect())
- checkAnswer(df.na.drop("any", Seq("*")), df.collect())
}
- test("fill/drop with nested columns") {
- val schema = new StructType()
- .add("c1", new StructType()
- .add("c1-1", StringType)
- .add("c1-2", StringType))
+ test("drop with col(*)") {
+ val df = createDF()
+ val exception = intercept[AnalysisException] {
+ df.na.drop("any", Seq("*"))
+ }
+ assert(exception.getMessage.contains("Cannot resolve column name \"*\""))
+ }
- val data = Seq(
- Row(Row(null, "a2")),
- Row(Row("b1", "b2")),
- Row(null))
+ test("fill with nested columns") {
+ val df = createDFWithNestedColumns
- val df = spark.createDataFrame(
- spark.sparkContext.parallelize(data), schema)
+ // Nested columns are ignored for fill().
+ checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), df)
+ }
- checkAnswer(df.select("c1.c1-1"),
- Row(null) :: Row("b1") :: Row(null) :: Nil)
+ test("drop with nested columns") {
+ val df = createDFWithNestedColumns
- // Nested columns are ignored for fill() and drop().
- checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), data)
- checkAnswer(df.na.drop("any", Seq("c1.c1-1")), data)
+ // Rows with the specified nested columns whose null values are dropped.
+ assert(df.count == 3)
+ checkAnswer(
+ df.na.drop("any", Seq("c1.c1-1")),
+ Seq(Row(Row("b1", "b2"))))
}
test("replace") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]