Repository: spark
Updated Branches:
  refs/heads/master edf02da38 -> 421382d0e


[SPARK-3824][SQL] Sets in-memory table default storage level to MEMORY_AND_DISK

Using `MEMORY_AND_DISK` as default storage level for in-memory table caching. 
Due to the in-memory columnar representation, recomputing an in-memory cached 
table partitions can be very expensive.

Author: Cheng Lian <[email protected]>

Closes #2686 from liancheng/spark-3824 and squashes the following commits:

35d2ed0 [Cheng Lian] Removes extra space
1ab7967 [Cheng Lian] Reduces test data size to fit DiskStore.getBytes()
ba565f0 [Cheng Lian] Maks CachedBatch serializable
07f0204 [Cheng Lian] Sets in-memory table default storage level to 
MEMORY_AND_DISK


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/421382d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/421382d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/421382d0

Branch: refs/heads/master
Commit: 421382d0e728940caa3e61bc11237c61f256378a
Parents: edf02da
Author: Cheng Lian <[email protected]>
Authored: Thu Oct 9 18:26:43 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Thu Oct 9 18:26:43 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/CacheManager.scala    | 10 +++++++---
 .../spark/sql/columnar/InMemoryColumnarTableScan.scala    |  9 +++++----
 .../scala/org/apache/spark/sql/CachedTableSuite.scala     | 10 +++++-----
 3 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/421382d0/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3bf7382..5ab2b53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.columnar.InMemoryRelation
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
 /** Holds a cached logical plan and its data */
 private case class CachedData(plan: LogicalPlan, cachedRepresentation: 
InMemoryRelation)
@@ -74,10 +74,14 @@ private[sql] trait CacheManager {
     cachedData.clear()
   }
 
-  /** Caches the data produced by the logical representation of the given 
schema rdd. */
+  /**
+   * Caches the data produced by the logical representation of the given 
schema rdd.  Unlike
+   * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` 
because recomputing
+   * the in-memory columnar representation of the underlying table is 
expensive.
+   */
   private[sql] def cacheQuery(
       query: SchemaRDD,
-      storageLevel: StorageLevel = MEMORY_ONLY): Unit = writeLock {
+      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
     val planToCache = query.queryExecution.optimizedPlan
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")

http://git-wip-us.apache.org/repos/asf/spark/blob/421382d0/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 4f79173..22ab0e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -38,7 +38,7 @@ private[sql] object InMemoryRelation {
     new InMemoryRelation(child.output, useCompression, batchSize, 
storageLevel, child)()
 }
 
-private[sql] case class CachedBatch(buffers: Array[ByteBuffer], stats: Row)
+private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
 
 private[sql] case class InMemoryRelation(
     output: Seq[Attribute],
@@ -91,7 +91,7 @@ private[sql] case class InMemoryRelation(
           val stats = Row.fromSeq(
             
columnBuilders.map(_.columnStats.collectedStatistics).foldLeft(Seq.empty[Any])(_
 ++ _))
 
-          CachedBatch(columnBuilders.map(_.build()), stats)
+          CachedBatch(columnBuilders.map(_.build().array()), stats)
         }
 
         def hasNext = rowIterator.hasNext
@@ -238,8 +238,9 @@ private[sql] case class InMemoryColumnarTableScan(
       def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]) = {
         val rows = cacheBatches.flatMap { cachedBatch =>
           // Build column accessors
-          val columnAccessors =
-            
requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))
+          val columnAccessors = requestedColumnIndices.map { batch =>
+            ColumnAccessor(ByteBuffer.wrap(cachedBatch.buffers(batch)))
+          }
 
           // Extract rows via column accessors
           new Iterator[Row] {

http://git-wip-us.apache.org/repos/asf/spark/blob/421382d0/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index c87ded8..444bc95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, 
InMemoryRelation}
 import org.apache.spark.sql.test.TestSQLContext._
-import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.storage.{StorageLevel, RDDBlockId}
 
 case class BigData(s: String)
 
@@ -55,10 +55,10 @@ class CachedTableSuite extends QueryTest {
 
   test("too big for memory") {
     val data = "*" * 10000
-    sparkContext.parallelize(1 to 1000000, 1).map(_ => 
BigData(data)).registerTempTable("bigData")
-    cacheTable("bigData")
-    assert(table("bigData").count() === 1000000L)
-    uncacheTable("bigData")
+    sparkContext.parallelize(1 to 200000, 1).map(_ => 
BigData(data)).registerTempTable("bigData")
+    table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
+    assert(table("bigData").count() === 200000L)
+    table("bigData").unpersist()
   }
 
   test("calling .cache() should use in-memory columnar caching") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to