Repository: spark
Updated Branches:
  refs/heads/branch-2.0 dff3d75db -> 214676d29


[SPARK-16163] [SQL] Cache the statistics for logical plans

## What changes were proposed in this pull request?

This calculation of statistics is not trivial anymore, it could be very slow on 
large query (for example, TPC-DS Q64 took several minutes to plan).

During the planning of a query, the statistics of any logical plan should not 
change (even InMemoryRelation), so we should use `lazy val` to cache the 
statistics.

For InMemoryRelation, the statistics could be updated after materialization, 
it's only useful when used in another query (before planning), because once we 
finished the planning, the statistics will not be used anymore.

## How was this patch tested?

Testsed with TPC-DS Q64, it could be planned in a second after the patch.

Author: Davies Liu <[email protected]>

Closes #13871 from davies/fix_statistics.

(cherry picked from commit 10396d9505c752cc18b6424f415d4ff0f460ad65)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/214676d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/214676d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/214676d2

Branch: refs/heads/branch-2.0
Commit: 214676d29d3d66c3e37ab6ff9fae70adb056b8b2
Parents: dff3d75
Author: Davies Liu <[email protected]>
Authored: Thu Jun 23 11:48:48 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Thu Jun 23 11:49:02 2016 -0700

----------------------------------------------------------------------
 .../plans/logical/basicLogicalOperators.scala   | 20 +++---
 .../execution/columnar/InMemoryRelation.scala   | 65 ++++++--------------
 .../scala/org/apache/spark/sql/QueryTest.scala  |  3 +-
 3 files changed, 30 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/214676d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ff3dcbc..79f9a21 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -159,7 +159,7 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) 
extends SetOperation
     }
   }
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     val leftSize = left.statistics.sizeInBytes
     val rightSize = right.statistics.sizeInBytes
     val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
@@ -184,7 +184,7 @@ case class Except(left: LogicalPlan, right: LogicalPlan) 
extends SetOperation(le
       left.output.zip(right.output).forall { case (l, r) => l.dataType == 
r.dataType } &&
       duplicateResolved
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     left.statistics.copy()
   }
 }
@@ -224,7 +224,7 @@ case class Union(children: Seq[LogicalPlan]) extends 
LogicalPlan {
     children.length > 1 && childrenResolved && allChildrenCompatible
   }
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
     Statistics(sizeInBytes = sizeInBytes)
   }
@@ -333,7 +333,7 @@ case class Join(
     case _ => resolvedExceptNatural
   }
 
-  override def statistics: Statistics = joinType match {
+  override lazy val statistics: Statistics = joinType match {
     case LeftAnti | LeftSemi =>
       // LeftSemi and LeftAnti won't ever be bigger than left
       left.statistics.copy()
@@ -351,7 +351,7 @@ case class BroadcastHint(child: LogicalPlan) extends 
UnaryNode {
   override def output: Seq[Attribute] = child.output
 
   // set isBroadcastable to true so the child will be broadcasted
-  override def statistics: Statistics = super.statistics.copy(isBroadcastable 
= true)
+  override lazy val statistics: Statistics = 
super.statistics.copy(isBroadcastable = true)
 }
 
 case class InsertIntoTable(
@@ -451,7 +451,7 @@ case class Range(
 
   override def newInstance(): Range = copy(output = 
output.map(_.newInstance()))
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     val sizeInBytes = LongType.defaultSize * numElements
     Statistics( sizeInBytes = sizeInBytes )
   }
@@ -486,7 +486,7 @@ case class Aggregate(
   override def validConstraints: Set[Expression] =
     child.constraints.union(getAliasedConstraints(aggregateExpressions))
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     if (groupingExpressions.isEmpty) {
       super.statistics.copy(sizeInBytes = 1)
     } else {
@@ -586,7 +586,7 @@ case class Expand(
   override def references: AttributeSet =
     AttributeSet(projections.flatten.flatMap(_.references))
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     val sizeInBytes = super.statistics.sizeInBytes * projections.length
     Statistics(sizeInBytes = sizeInBytes)
   }
@@ -706,7 +706,7 @@ case class Sample(
 
   override def output: Seq[Attribute] = child.output
 
-  override def statistics: Statistics = {
+  override lazy val statistics: Statistics = {
     val ratio = upperBound - lowerBound
     // BigInt can't multiply with Double
     var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
@@ -753,5 +753,5 @@ case object OneRowRelation extends LeafNode {
    *
    * [[LeafNode]]s must override this.
    */
-  override def statistics: Statistics = Statistics(sizeInBytes = 1)
+  override lazy val statistics: Statistics = Statistics(sizeInBytes = 1)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/214676d2/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index c546d4b..02866c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -63,58 +63,32 @@ private[sql] case class InMemoryRelation(
     @transient child: SparkPlan,
     tableName: Option[String])(
     @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    @transient private[sql] var _statistics: Statistics = null,
-    private[sql] var _batchStats: CollectionAccumulator[InternalRow] = null)
+    private[sql] val batchStats: CollectionAccumulator[InternalRow] =
+      child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
 
   override def producedAttributes: AttributeSet = outputSet
 
-  private[sql] val batchStats: CollectionAccumulator[InternalRow] =
-    if (_batchStats == null) {
-      child.sqlContext.sparkContext.collectionAccumulator[InternalRow]
-    } else {
-      _batchStats
-    }
-
   @transient val partitionStatistics = new PartitionStatistics(output)
 
-  private def computeSizeInBytes = {
-    val sizeOfRow: Expression =
-      BindReferences.bindReference(
-        output.map(a => 
partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
-        partitionStatistics.schema)
-
-    batchStats.value.asScala.map(row => 
sizeOfRow.eval(row).asInstanceOf[Long]).sum
-  }
-
-  // Statistics propagation contracts:
-  // 1. Non-null `_statistics` must reflect the actual statistics of the 
underlying data
-  // 2. Only propagate statistics when `_statistics` is non-null
-  private def statisticsToBePropagated = if (_statistics == null) {
-    val updatedStats = statistics
-    if (_statistics == null) null else updatedStats
-  } else {
-    _statistics
-  }
-
-  override def statistics: Statistics = {
-    if (_statistics == null) {
-      if (batchStats.value.isEmpty) {
-        // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
-        // available, return the default statistics.
-        Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
-      } else {
-        // Underlying columnar RDD has been materialized, required information 
has also been
-        // collected via the `batchStats` accumulator, compute the final 
statistics,
-        // and update `_statistics`.
-        _statistics = Statistics(sizeInBytes = computeSizeInBytes)
-        _statistics
-      }
+  override lazy val statistics: Statistics = {
+    if (batchStats.value.isEmpty) {
+      // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
+      // available, return the default statistics.
+      Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
     } else {
-      // Pre-computed statistics
-      _statistics
+      // Underlying columnar RDD has been materialized, required information 
has also been
+      // collected via the `batchStats` accumulator.
+      val sizeOfRow: Expression =
+        BindReferences.bindReference(
+          output.map(a => 
partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
+          partitionStatistics.schema)
+
+      val sizeInBytes =
+        batchStats.value.asScala.map(row => 
sizeOfRow.eval(row).asInstanceOf[Long]).sum
+      Statistics(sizeInBytes = sizeInBytes)
     }
   }
 
@@ -187,7 +161,7 @@ private[sql] case class InMemoryRelation(
   def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
     InMemoryRelation(
       newOutput, useCompression, batchSize, storageLevel, child, tableName)(
-        _cachedColumnBuffers, statisticsToBePropagated, batchStats)
+        _cachedColumnBuffers, batchStats)
   }
 
   override def newInstance(): this.type = {
@@ -199,12 +173,11 @@ private[sql] case class InMemoryRelation(
       child,
       tableName)(
         _cachedColumnBuffers,
-        statisticsToBePropagated,
         batchStats).asInstanceOf[this.type]
   }
 
   def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
 
   override protected def otherCopyArgs: Seq[AnyRef] =
-    Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
+    Seq(_cachedColumnBuffers, batchStats)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/214676d2/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 742f036..b15f38c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -323,8 +323,7 @@ abstract class QueryTest extends PlanTest {
           origin.child,
           l.tableName)(
           origin.cachedColumnBuffers,
-          l._statistics,
-          origin._batchStats)
+          origin.batchStats)
       case p =>
         p.transformExpressions {
           case s: SubqueryExpression =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to