Repository: spark
Updated Branches:
refs/heads/branch-2.2 6764408f6 -> 5d10586a0
[SPARK-22076][SQL] Expand.projections should not be a Stream
## What changes were proposed in this pull request?
Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as
"b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```
It can be traced back to https://github.com/apache/spark/pull/15484 , which
made `Expand.projections` a lazy `Stream` for group by cube.
In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures
the entire query plan which has some un-serializable parts.
This change is also good for master branch, to reduce the serialized size of
`Expand.projections`.
## How was this patch tested?
manually verified with Spark with Scala 2.10.
Author: Wenchen Fan <[email protected]>
Closes #19289 from cloud-fan/bug.
(cherry picked from commit ce6a71e013c403d0a3690cf823934530ce0ea5ef)
Signed-off-by: gatorsmile <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d10586a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d10586a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d10586a
Branch: refs/heads/branch-2.2
Commit: 5d10586a0065c6845e0e89afc5f22e09baa185b7
Parents: 6764408
Author: Wenchen Fan <[email protected]>
Authored: Wed Sep 20 09:00:43 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Wed Sep 20 09:01:25 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5d10586a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c970c20..f707aa8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -280,9 +280,15 @@ class Analyzer(
* We need to get all of its subsets for a given GROUPBY expression, the
subsets are
* represented as sequence of expressions.
*/
- def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList
match {
+ def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
+ // `cubeExprs0` is recursive and returns a lazy Stream. Here we call
`toIndexedSeq` to
+ // materialize it and avoid serialization problems later on.
+ cubeExprs0(exprs).toIndexedSeq
+ }
+
+ def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] =
exprs.toList match {
case x :: xs =>
- val initial = cubeExprs(xs)
+ val initial = cubeExprs0(xs)
initial.map(x +: _) ++ initial
case Nil =>
Seq(Seq.empty)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]