Repository: spark
Updated Branches:
  refs/heads/branch-2.2 81232ce03 -> 8a4e7dd89


[SPARK-22206][SQL][SPARKR] gapply in R can't work on empty grouping columns

## What changes were proposed in this pull request?

Looks like `FlatMapGroupsInRExec.requiredChildDistribution` didn't consider 
empty grouping attributes. It should be a problem when running 
`EnsureRequirements` and `gapply` in R can't work on empty grouping columns.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #19436 from viirya/fix-flatmapinr-distribution.

(cherry picked from commit ae61f187aa0471242c046fdeac6ed55b9b98a3f6)
Signed-off-by: hyukjinkwon <gurwls...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a4e7dd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a4e7dd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a4e7dd8

Branch: refs/heads/branch-2.2
Commit: 8a4e7dd896be20c560097c88ffd79c0d6c30d017
Parents: 81232ce
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Thu Oct 5 23:36:18 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Thu Oct 5 23:36:56 2017 +0900

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R                          | 5 +++++
 .../main/scala/org/apache/spark/sql/execution/objects.scala    | 6 +++++-
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a4e7dd8/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index fc69b4d..12d8fef 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2740,6 +2740,11 @@ test_that("gapply() and gapplyCollect() on a DataFrame", 
{
   df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
   expect_identical(df1Collect, expected)
 
+  # gapply on empty grouping columns.
+  df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
+  actual <- collect(df1)
+  expect_identical(actual, expected)
+
   # Computes the sum of second column by grouping on the first and third 
columns
   # and checks if the sum is larger than 2
   schema <- structType(structField("a", "integer"), structField("e", 
"boolean"))

http://git-wip-us.apache.org/repos/asf/spark/blob/8a4e7dd8/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 3439181..3643ef3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -397,7 +397,11 @@ case class FlatMapGroupsInRExec(
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(groupingAttributes) :: Nil
+    if (groupingAttributes.isEmpty) {
+      AllTuples :: Nil
+    } else {
+      ClusteredDistribution(groupingAttributes) :: Nil
+    }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
     Seq(groupingAttributes.map(SortOrder(_, Ascending)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to