Repository: spark
Updated Branches:
  refs/heads/branch-2.0 50f6be759 -> a9165bb1b


[SPARK-17549][SQL] Only collect table size stat in driver for cached relation.

This reverts commit 9ac68dbc5720026ea92acc61d295ca64d0d3d132. Turns out
the original fix was correct.

Original change description:
The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.

It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.

There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.

Author: Marcelo Vanzin <[email protected]>

Closes #15304 from vanzin/SPARK-17549.2.

(cherry picked from commit 8d969a2125d915da1506c17833aa98da614a257f)
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9165bb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9165bb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9165bb1

Branch: refs/heads/branch-2.0
Commit: a9165bb1b704483ad16331945b0968cbb1a97139
Parents: 50f6be7
Author: Marcelo Vanzin <[email protected]>
Authored: Tue Oct 4 09:38:44 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Tue Oct 4 09:39:50 2016 -0700

----------------------------------------------------------------------
 .../execution/columnar/InMemoryRelation.scala   | 24 +++++---------------
 .../columnar/InMemoryColumnarQuerySuite.scala   | 14 ++++++++++++
 2 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9165bb1/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 479934a..56bd5c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.columnar
 
-import scala.collection.JavaConverters._
-
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.network.util.JavaUtils
@@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.CollectionAccumulator
+import org.apache.spark.util.LongAccumulator
 
 
 object InMemoryRelation {
@@ -63,8 +61,7 @@ case class InMemoryRelation(
     @transient child: SparkPlan,
     tableName: Option[String])(
     @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    val batchStats: CollectionAccumulator[InternalRow] =
-      child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
+    val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -74,21 +71,12 @@ case class InMemoryRelation(
   @transient val partitionStatistics = new PartitionStatistics(output)
 
   override lazy val statistics: Statistics = {
-    if (batchStats.value.isEmpty) {
+    if (batchStats.value == 0L) {
       // Underlying columnar RDD hasn't been materialized, no useful 
statistics information
       // available, return the default statistics.
       Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
     } else {
-      // Underlying columnar RDD has been materialized, required information 
has also been
-      // collected via the `batchStats` accumulator.
-      val sizeOfRow: Expression =
-        BindReferences.bindReference(
-          output.map(a => 
partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
-          partitionStatistics.schema)
-
-      val sizeInBytes =
-        batchStats.value.asScala.map(row => 
sizeOfRow.eval(row).asInstanceOf[Long]).sum
-      Statistics(sizeInBytes = sizeInBytes)
+      Statistics(sizeInBytes = batchStats.value.longValue)
     }
   }
 
@@ -139,10 +127,10 @@ case class InMemoryRelation(
             rowCount += 1
           }
 
+          batchStats.add(totalSize)
+
           val stats = 
InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
             .flatMap(_.values))
-
-          batchStats.add(stats)
           CachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)

http://git-wip-us.apache.org/repos/asf/spark/blob/a9165bb1/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 9378396..0daa29b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
     val columnTypes2 = List.fill(length2)(IntegerType)
     val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
   }
+
+  test("SPARK-17549: cached table size should be correctly calculated") {
+    val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
+    val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+    val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
+
+    // Materialize the data.
+    val expectedAnswer = data.collect()
+    checkAnswer(cached, expectedAnswer)
+
+    // Check that the right size was calculated.
+    assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to