Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d7467484f -> 39cfa9c0b


[SPARK-1994][SQL] Weird data corruption bug when running Spark SQL on data in 
HDFS

Basically there is a race condition (possibly a scala bug?) when these values 
are recomputed on all of the slaves that results in an incorrect projection 
being generated (possibly because the GUID uniqueness contract is broken?).

In general we should probably enforce that all expression planing occurs on the 
driver, as is now occurring here.

Author: Michael Armbrust <[email protected]>

Closes #1004 from marmbrus/fixAggBug and squashes the following commits:

e0c116c [Michael Armbrust] Compute aggregate expression during planning instead 
of lazily on workers.

(cherry picked from commit a6c72ab16e7a3027739ab419819f5222e270838e)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39cfa9c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39cfa9c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39cfa9c0

Branch: refs/heads/branch-1.0
Commit: 39cfa9c0be34d4baf9de4eb9f9191c7b406c4d59
Parents: d746748
Author: Michael Armbrust <[email protected]>
Authored: Sat Jun 7 14:20:33 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Jun 7 14:21:15 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/Aggregate.scala   | 15 +++++----------
 1 file changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39cfa9c0/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 604914e..34d88fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -77,8 +77,7 @@ case class Aggregate(
       resultAttribute: AttributeReference)
 
   /** A list of aggregates that need to be computed for each group. */
-  @transient
-  private[this] lazy val computedAggregates = aggregateExpressions.flatMap { 
agg =>
+  private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
     agg.collect {
       case a: AggregateExpression =>
         ComputedAggregate(
@@ -89,8 +88,7 @@ case class Aggregate(
   }.toArray
 
   /** The schema of the result of all aggregate evaluations */
-  @transient
-  private[this] lazy val computedSchema = 
computedAggregates.map(_.resultAttribute)
+  private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
 
   /** Creates a new aggregate buffer for a group. */
   private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
@@ -104,8 +102,7 @@ case class Aggregate(
   }
 
   /** Named attributes used to substitute grouping attributes into the final 
result. */
-  @transient
-  private[this] lazy val namedGroups = groupingExpressions.map {
+  private[this] val namedGroups = groupingExpressions.map {
     case ne: NamedExpression => ne -> ne.toAttribute
     case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
   }
@@ -114,16 +111,14 @@ case class Aggregate(
    * A map of substitutions that are used to insert the aggregate expressions 
and grouping
    * expression into the final result expression.
    */
-  @transient
-  private[this] lazy val resultMap =
+  private[this] val resultMap =
     (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ 
namedGroups).toMap
 
   /**
    * Substituted version of aggregateExpressions expressions which are used to 
compute final
    * output rows given a group and the result of all aggregate computations.
    */
-  @transient
-  private[this] lazy val resultExpressions = aggregateExpressions.map { agg =>
+  private[this] val resultExpressions = aggregateExpressions.map { agg =>
     agg.transform {
       case e: Expression if resultMap.contains(e) => resultMap(e)
     }

Reply via email to