Repository: spark
Updated Branches:
  refs/heads/branch-2.2 6764408f6 -> 5d10586a0


[SPARK-22076][SQL] Expand.projections should not be a Stream

## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as 
"b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to https://github.com/apache/spark/pull/15484 , which 
made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures 
the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of 
`Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <[email protected]>

Closes #19289 from cloud-fan/bug.

(cherry picked from commit ce6a71e013c403d0a3690cf823934530ce0ea5ef)
Signed-off-by: gatorsmile <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d10586a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d10586a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d10586a

Branch: refs/heads/branch-2.2
Commit: 5d10586a0065c6845e0e89afc5f22e09baa185b7
Parents: 6764408
Author: Wenchen Fan <[email protected]>
Authored: Wed Sep 20 09:00:43 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Wed Sep 20 09:01:25 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d10586a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c970c20..f707aa8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -280,9 +280,15 @@ class Analyzer(
      *  We need to get all of its subsets for a given GROUPBY expression, the 
subsets are
      *  represented as sequence of expressions.
      */
-    def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList 
match {
+    def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
+      // `cubeExprs0` is recursive and returns a lazy Stream. Here we call 
`toIndexedSeq` to
+      // materialize it and avoid serialization problems later on.
+      cubeExprs0(exprs).toIndexedSeq
+    }
+
+    def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] = 
exprs.toList match {
       case x :: xs =>
-        val initial = cubeExprs(xs)
+        val initial = cubeExprs0(xs)
         initial.map(x +: _) ++ initial
       case Nil =>
         Seq(Seq.empty)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to