Repository: spark
Updated Branches:
  refs/heads/master e17901d6d -> ce6a71e01


[SPARK-22076][SQL] Expand.projections should not be a Stream

## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as 
"b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to https://github.com/apache/spark/pull/15484 , which 
made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures 
the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of 
`Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <[email protected]>

Closes #19289 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce6a71e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce6a71e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce6a71e0

Branch: refs/heads/master
Commit: ce6a71e013c403d0a3690cf823934530ce0ea5ef
Parents: e17901d
Author: Wenchen Fan <[email protected]>
Authored: Wed Sep 20 09:00:43 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Wed Sep 20 09:00:43 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce6a71e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index db276fb..4535176 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -279,9 +279,15 @@ class Analyzer(
      *  We need to get all of its subsets for a given GROUPBY expression, the 
subsets are
      *  represented as sequence of expressions.
      */
-    def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList 
match {
+    def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
+      // `cubeExprs0` is recursive and returns a lazy Stream. Here we call 
`toIndexedSeq` to
+      // materialize it and avoid serialization problems later on.
+      cubeExprs0(exprs).toIndexedSeq
+    }
+
+    def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] = 
exprs.toList match {
       case x :: xs =>
-        val initial = cubeExprs(xs)
+        val initial = cubeExprs0(xs)
         initial.map(x +: _) ++ initial
       case Nil =>
         Seq(Seq.empty)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to