Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e76f4f47f -> 2bd37ce96


[SPARK-17549][SQL] Revert "[] Only collect table size stat in driver for cached 
relation."

This reverts commit 39e2bad6a866d27c3ca594d15e574a1da3ee84cc because of the 
problem mentioned at 
https://issues.apache.org/jira/browse/SPARK-17549?focusedCommentId=15505060&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15505060

Author: Yin Huai <yh...@databricks.com>

Closes #15157 from yhuai/revert-SPARK-17549.

(cherry picked from commit 9ac68dbc5720026ea92acc61d295ca64d0d3d132)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bd37ce9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bd37ce9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bd37ce9

Branch: refs/heads/branch-2.0
Commit: 2bd37ce9603b2cd95b4f462edd60d7b62174e730
Parents: e76f4f4
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Sep 20 11:53:57 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Sep 20 11:54:13 2016 -0700

----------------------------------------------------------------------
 .../execution/columnar/InMemoryRelation.scala   | 24 +++++++++++++++-----
 .../columnar/InMemoryColumnarQuerySuite.scala   | 14 ------------
 2 files changed, 18 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bd37ce9/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 56bd5c1..479934a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.columnar
 
+import scala.collection.JavaConverters._
+
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.network.util.JavaUtils
@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.CollectionAccumulator
 
 
 object InMemoryRelation {
@@ -61,7 +63,8 @@ case class InMemoryRelation(
     @transient child: SparkPlan,
     tableName: Option[String])(
     @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
+    val batchStats: CollectionAccumulator[InternalRow] =
+      child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -71,12 +74,21 @@ case class InMemoryRelation(
   @transient val partitionStatistics = new PartitionStatistics(output)
 
   override lazy val statistics: Statistics = {
-    if (batchStats.value == 0L) {
+    if (batchStats.value.isEmpty) {
       // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
       // available, return the default statistics.
       Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
     } else {
-      Statistics(sizeInBytes = batchStats.value.longValue)
+      // Underlying columnar RDD has been materialized, required information 
has also been
+      // collected via the `batchStats` accumulator.
+      val sizeOfRow: Expression =
+        BindReferences.bindReference(
+          output.map(a => 
partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
+          partitionStatistics.schema)
+
+      val sizeInBytes =
+        batchStats.value.asScala.map(row => 
sizeOfRow.eval(row).asInstanceOf[Long]).sum
+      Statistics(sizeInBytes = sizeInBytes)
     }
   }
 
@@ -127,10 +139,10 @@ case class InMemoryRelation(
             rowCount += 1
           }
 
-          batchStats.add(totalSize)
-
           val stats = 
InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
             .flatMap(_.values))
+
+          batchStats.add(stats)
           CachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bd37ce9/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 0daa29b..9378396 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
     val columnTypes2 = List.fill(length2)(IntegerType)
     val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
   }
-
-  test("SPARK-17549: cached table size should be correctly calculated") {
-    val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
-    val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
-    val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
-
-    // Materialize the data.
-    val expectedAnswer = data.collect()
-    checkAnswer(cached, expectedAnswer)
-
-    // Check that the right size was calculated.
-    assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to