Repository: spark
Updated Branches:
refs/heads/branch-2.3 5b5851cb6 -> eb4fa551e
[SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data frames
## What changes were proposed in this pull request?
(courtesy of liancheng)
Spark SQL supports both global aggregation and grouping aggregation. Global
aggregation always return a single row with the initial aggregation state as
the output, even there are zero input rows. Spark implements this by simply
checking the number of grouping keys and treats an aggregation as a global
aggregation if it has zero grouping keys.
However, this simple principle drops the ball in the following case:
```scala
spark.emptyDataFrame.dropDuplicates().agg(count($"*") as "c").show()
// +---+
// | c |
// +---+
// | 1 |
// +---+
```
The reason is that:
1. `df.dropDuplicates()` is roughly translated into something equivalent to:
```scala
val allColumns = df.columns.map { col }
df.groupBy(allColumns: _*).agg(allColumns.head, allColumns.tail: _*)
```
This translation is implemented in the rule `ReplaceDeduplicateWithAggregate`.
2. `spark.emptyDataFrame` contains zero columns and zero rows.
Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing
transformation roughly equivalent to the following one:
```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
```
The above transformation is confusing because the resulting aggregate operator
contains no grouping keys (because `emptyDataFrame` contains no columns), and
gets recognized as a global aggregation. As a result, Spark SQL allocates a
single row filled by the initial aggregation state and uses it as the output,
and returns a wrong result.
To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by
appending a literal `1` to the grouping key list of the resulting `Aggregate`
operator when the input plan contains zero output columns. In this way,
`spark.emptyDataFrame.dropDuplicates()` is now translated into a grouping
aggregation, roughly depicted as:
```scala
spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
```
Which is now properly treated as a grouping aggregation and returns the correct
answer.
## How was this patch tested?
New unit tests added
Author: Feng Liu <[email protected]>
Closes #20174 from liufengdb/fix-duplicate.
(cherry picked from commit 9b33dfc408de986f4203bb0ac0c3f5c56effd69d)
Signed-off-by: Cheng Lian <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb4fa551
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb4fa551
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb4fa551
Branch: refs/heads/branch-2.3
Commit: eb4fa551e60800269a939b2c1c0ad69e3a801264
Parents: 5b5851c
Author: Feng Liu <[email protected]>
Authored: Wed Jan 10 14:25:04 2018 -0800
Committer: Cheng Lian <[email protected]>
Committed: Wed Jan 10 14:25:33 2018 -0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 8 ++++++-
.../optimizer/ReplaceOperatorSuite.scala | 10 +++++++-
.../spark/sql/DataFrameAggregateSuite.scala | 24 ++++++++++++++++++--
3 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index df0af82..c794ba8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1222,7 +1222,13 @@ object ReplaceDeduplicateWithAggregate extends
Rule[LogicalPlan] {
Alias(new First(attr).toAggregateExpression(),
attr.name)(attr.exprId)
}
}
- Aggregate(keys, aggCols, child)
+ // SPARK-22951: Physical aggregate operators distinguishes global
aggregation and grouping
+ // aggregations by checking the number of grouping keys. The key
difference here is that a
+ // global aggregation always returns at least one row even if there are
no input rows. Here
+ // we append a literal when the grouping key list is empty so that the
result aggregate
+ // operator is properly treated as a grouping aggregation.
+ val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
+ Aggregate(nonemptyKeys, aggCols, child)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 0fa1aae..e9701ff 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -198,6 +198,14 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("add one grouping key if necessary when replace Deduplicate with
Aggregate") {
+ val input = LocalRelation()
+ val query = Deduplicate(Seq.empty, input) // dropDuplicates()
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = Aggregate(Seq(Literal(1)), input.output, input)
+ comparePlans(optimized, correctAnswer)
+ }
+
test("don't replace streaming Deduplicate") {
val input = LocalRelation(Seq('a.int, 'b.int), isStreaming = true)
val attrA = input.output(0)
http://git-wip-us.apache.org/repos/asf/spark/blob/eb4fa551/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 06848e4..e7776e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import scala.util.Random
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -27,7 +29,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData.DecimalData
-import org.apache.spark.sql.types.{Decimal, DecimalType}
+import org.apache.spark.sql.types.DecimalType
case class Fact(date: Int, hour: Int, minute: Int, room_name: String, temp:
Double)
@@ -456,7 +458,6 @@ class DataFrameAggregateSuite extends QueryTest with
SharedSQLContext {
test("null moments") {
val emptyTableData = Seq.empty[(Int, Int)].toDF("a", "b")
-
checkAnswer(
emptyTableData.agg(variance('a), var_samp('a), var_pop('a),
skewness('a), kurtosis('a)),
Row(null, null, null, null, null))
@@ -666,4 +667,23 @@ class DataFrameAggregateSuite extends QueryTest with
SharedSQLContext {
assert(exchangePlans.length == 1)
}
}
+
+ Seq(true, false).foreach { codegen =>
+ test("SPARK-22951: dropDuplicates on empty dataFrames should produce
correct aggregate " +
+ s"results when codegen is enabled: $codegen") {
+ withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, codegen.toString)) {
+ // explicit global aggregations
+ val emptyAgg = Map.empty[String, String]
+ checkAnswer(spark.emptyDataFrame.agg(emptyAgg), Seq(Row()))
+ checkAnswer(spark.emptyDataFrame.groupBy().agg(emptyAgg), Seq(Row()))
+ checkAnswer(spark.emptyDataFrame.groupBy().agg(count("*")),
Seq(Row(0)))
+ checkAnswer(spark.emptyDataFrame.dropDuplicates().agg(emptyAgg),
Seq(Row()))
+
checkAnswer(spark.emptyDataFrame.dropDuplicates().groupBy().agg(emptyAgg),
Seq(Row()))
+
checkAnswer(spark.emptyDataFrame.dropDuplicates().groupBy().agg(count("*")),
Seq(Row(0)))
+
+ // global aggregation is converted to grouping aggregation:
+ assert(spark.emptyDataFrame.dropDuplicates().count() == 0)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]