Repository: spark
Updated Branches:
  refs/heads/master d98d1cb00 -> 5aca6ad00


[SPARK-11767] [SQL] limit the size of caced batch

Currently the size of cached batch in only controlled by `batchSize` (default 
value is 10000), which does not work well with the size of serialized columns 
(for example, complex types). The memory used to build the batch is not 
accounted, it's easy to OOM (especially after unified memory management).

This PR introduce a hard limit as 4M for total columns (up to 50 columns of 
uncompressed primitive columns).

This also change the way to grow buffer, double it each time, then trim it once 
finished.

cc liancheng

Author: Davies Liu <[email protected]>

Closes #9760 from davies/cache_limit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5aca6ad0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5aca6ad0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5aca6ad0

Branch: refs/heads/master
Commit: 5aca6ad00c9d7fa43c725b8da4a10114a3a77421
Parents: d98d1cb
Author: Davies Liu <[email protected]>
Authored: Tue Nov 17 12:50:01 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Tue Nov 17 12:50:01 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/columnar/ColumnBuilder.scala   | 12 ++++++++++--
 .../org/apache/spark/sql/columnar/ColumnStats.scala     |  2 +-
 .../spark/sql/columnar/InMemoryColumnarTableScan.scala  |  6 +++++-
 3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5aca6ad0/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 7a7345a..599f30f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -73,6 +73,13 @@ private[sql] class BasicColumnBuilder[JvmType](
   }
 
   override def build(): ByteBuffer = {
+    if (buffer.capacity() > buffer.position() * 1.1) {
+      // trim the buffer
+      buffer = ByteBuffer
+        .allocate(buffer.position())
+        .order(ByteOrder.nativeOrder())
+        .put(buffer.array(), 0, buffer.position())
+    }
     buffer.flip().asInstanceOf[ByteBuffer]
   }
 }
@@ -129,7 +136,8 @@ private[sql] class MapColumnBuilder(dataType: MapType)
   extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))
 
 private[sql] object ColumnBuilder {
-  val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
+  val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024
+  val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L
 
   private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
     if (orig.remaining >= size) {
@@ -137,7 +145,7 @@ private[sql] object ColumnBuilder {
     } else {
       // grow in steps of initial size
       val capacity = orig.capacity()
-      val newSize = capacity + size.max(capacity / 8 + 1)
+      val newSize = capacity + size.max(capacity)
       val pos = orig.position()
 
       ByteBuffer

http://git-wip-us.apache.org/repos/asf/spark/blob/5aca6ad0/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index ba61003..91a0565 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -48,7 +48,7 @@ private[sql] class PartitionStatistics(tableSchema: 
Seq[Attribute]) extends Seri
 private[sql] sealed trait ColumnStats extends Serializable {
   protected var count = 0
   protected var nullCount = 0
-  protected var sizeInBytes = 0L
+  private[sql] var sizeInBytes = 0L
 
   /**
    * Gathers statistics information from `row(ordinal)`.

http://git-wip-us.apache.org/repos/asf/spark/blob/5aca6ad0/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 2cface6..ae77298 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -133,7 +133,9 @@ private[sql] case class InMemoryRelation(
           }.toArray
 
           var rowCount = 0
-          while (rowIterator.hasNext && rowCount < batchSize) {
+          var totalSize = 0L
+          while (rowIterator.hasNext && rowCount < batchSize
+            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
             val row = rowIterator.next()
 
             // Added for SPARK-6082. This assertion can be useful for 
scenarios when something
@@ -147,8 +149,10 @@ private[sql] case class InMemoryRelation(
                 s"\nRow content: $row")
 
             var i = 0
+            totalSize = 0
             while (i < row.numFields) {
               columnBuilders(i).appendFrom(row, i)
+              totalSize += columnBuilders(i).columnStats.sizeInBytes
               i += 1
             }
             rowCount += 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to