Repository: spark Updated Branches: refs/heads/master d98d1cb00 -> 5aca6ad00
[SPARK-11767] [SQL] limit the size of caced batch Currently the size of cached batch in only controlled by `batchSize` (default value is 10000), which does not work well with the size of serialized columns (for example, complex types). The memory used to build the batch is not accounted, it's easy to OOM (especially after unified memory management). This PR introduce a hard limit as 4M for total columns (up to 50 columns of uncompressed primitive columns). This also change the way to grow buffer, double it each time, then trim it once finished. cc liancheng Author: Davies Liu <[email protected]> Closes #9760 from davies/cache_limit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5aca6ad0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5aca6ad0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5aca6ad0 Branch: refs/heads/master Commit: 5aca6ad00c9d7fa43c725b8da4a10114a3a77421 Parents: d98d1cb Author: Davies Liu <[email protected]> Authored: Tue Nov 17 12:50:01 2015 -0800 Committer: Davies Liu <[email protected]> Committed: Tue Nov 17 12:50:01 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/columnar/ColumnBuilder.scala | 12 ++++++++++-- .../org/apache/spark/sql/columnar/ColumnStats.scala | 2 +- .../spark/sql/columnar/InMemoryColumnarTableScan.scala | 6 +++++- 3 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5aca6ad0/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 7a7345a..599f30f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -73,6 +73,13 @@ private[sql] class BasicColumnBuilder[JvmType]( } override def build(): ByteBuffer = { + if (buffer.capacity() > buffer.position() * 1.1) { + // trim the buffer + buffer = ByteBuffer + .allocate(buffer.position()) + .order(ByteOrder.nativeOrder()) + .put(buffer.array(), 0, buffer.position()) + } buffer.flip().asInstanceOf[ByteBuffer] } } @@ -129,7 +136,8 @@ private[sql] class MapColumnBuilder(dataType: MapType) extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType)) private[sql] object ColumnBuilder { - val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024 + val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024 + val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { if (orig.remaining >= size) { @@ -137,7 +145,7 @@ private[sql] object ColumnBuilder { } else { // grow in steps of initial size val capacity = orig.capacity() - val newSize = capacity + size.max(capacity / 8 + 1) + val newSize = capacity + size.max(capacity) val pos = orig.position() ByteBuffer http://git-wip-us.apache.org/repos/asf/spark/blob/5aca6ad0/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index ba61003..91a0565 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -48,7 +48,7 @@ private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Seri private[sql] sealed trait ColumnStats extends Serializable { protected var count = 0 protected var nullCount = 0 - protected var sizeInBytes = 0L + private[sql] var sizeInBytes = 0L /** * Gathers statistics information from `row(ordinal)`. http://git-wip-us.apache.org/repos/asf/spark/blob/5aca6ad0/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 2cface6..ae77298 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -133,7 +133,9 @@ private[sql] case class InMemoryRelation( }.toArray var rowCount = 0 - while (rowIterator.hasNext && rowCount < batchSize) { + var totalSize = 0L + while (rowIterator.hasNext && rowCount < batchSize + && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { val row = rowIterator.next() // Added for SPARK-6082. This assertion can be useful for scenarios when something @@ -147,8 +149,10 @@ private[sql] case class InMemoryRelation( s"\nRow content: $row") var i = 0 + totalSize = 0 while (i < row.numFields) { columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes i += 1 } rowCount += 1 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
