Repository: spark Updated Branches: refs/heads/master 5e2b44474 -> 1714350bd
[SPARK-11792][SQL] SizeEstimator cannot provide a good size estimation of UnsafeHashedRelations https://issues.apache.org/jira/browse/SPARK-11792 Right now, SizeEstimator will "think" a small UnsafeHashedRelation is several GBs. Author: Yin Huai <[email protected]> Closes #9788 from yhuai/SPARK-11792. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1714350b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1714350b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1714350b Branch: refs/heads/master Commit: 1714350bddd78cd1398e1a816f675ab729001081 Parents: 5e2b444 Author: Yin Huai <[email protected]> Authored: Wed Nov 18 00:42:52 2015 -0800 Committer: Reynold Xin <[email protected]> Committed: Wed Nov 18 00:42:52 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/memory/TaskMemoryManager.java | 3 +++ .../org/apache/spark/util/SizeEstimator.scala | 26 +++++++++++++++++--- .../apache/spark/util/SizeEstimatorSuite.scala | 22 +++++++++++++++++ .../sql/execution/joins/HashedRelation.scala | 10 ++++++-- 4 files changed, 55 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1714350b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 5f743b2..d31eb44 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -215,6 +215,9 @@ public class TaskMemoryManager { logger.info( "{} bytes of memory were used by task {} but are not associated with specific consumers", memoryNotAccountedFor, taskAttemptId); + logger.info( + "{} bytes of memory are used for execution and {} bytes of memory are used for storage", + memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/1714350b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 23ee4ef..c3a2675 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -32,6 +32,16 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.collection.OpenHashSet /** + * A trait that allows a class to give [[SizeEstimator]] more accurate size estimation. + * When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first. + * If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the returned size + * as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work. + */ +private[spark] trait SizeEstimation { + def estimatedSize: Option[Long] +} + +/** * :: DeveloperApi :: * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in * memory-aware caches. @@ -199,10 +209,18 @@ object SizeEstimator extends Logging { // the size estimator since it references the whole REPL. Do nothing in this case. In // general all ClassLoaders and Classes will be shared between objects anyway. } else { - val classInfo = getClassInfo(cls) - state.size += alignSize(classInfo.shellSize) - for (field <- classInfo.pointerFields) { - state.enqueue(field.get(obj)) + val estimatedSize = obj match { + case s: SizeEstimation => s.estimatedSize + case _ => None + } + if (estimatedSize.isDefined) { + state.size += estimatedSize.get + } else { + val classInfo = getClassInfo(cls) + state.size += alignSize(classInfo.shellSize) + for (field <- classInfo.pointerFields) { + state.enqueue(field.get(obj)) + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/1714350b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index 2055017..9b6261a 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -60,6 +60,18 @@ class DummyString(val arr: Array[Char]) { @transient val hash32: Int = 0 } +class DummyClass8 extends SizeEstimation { + val x: Int = 0 + + override def estimatedSize: Option[Long] = Some(2015) +} + +class DummyClass9 extends SizeEstimation { + val x: Int = 0 + + override def estimatedSize: Option[Long] = None +} + class SizeEstimatorSuite extends SparkFunSuite with BeforeAndAfterEach @@ -214,4 +226,14 @@ class SizeEstimatorSuite // Class should be 32 bytes on s390x if recognised as 64 bit platform assertResult(32)(SizeEstimator.estimate(new DummyClass7)) } + + test("SizeEstimation can provide the estimated size") { + // DummyClass8 provides its size estimation. + assertResult(2015)(SizeEstimator.estimate(new DummyClass8)) + assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new DummyClass8))) + + // DummyClass9 does not provide its size estimation. + assertResult(16)(SizeEstimator.estimate(new DummyClass9)) + assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass9))) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1714350b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index cc8abb1..49ae09b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.unsafe.memory.MemoryLocation -import org.apache.spark.util.Utils +import org.apache.spark.util.{SizeEstimation, Utils} import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.{SparkConf, SparkEnv} @@ -189,7 +189,9 @@ private[execution] object HashedRelation { */ private[joins] final class UnsafeHashedRelation( private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]]) - extends HashedRelation with Externalizable { + extends HashedRelation + with SizeEstimation + with Externalizable { private[joins] def this() = this(null) // Needed for serialization @@ -215,6 +217,10 @@ private[joins] final class UnsafeHashedRelation( } } + override def estimatedSize: Option[Long] = { + Option(binaryMap).map(_.getTotalMemoryConsumption) + } + override def get(key: InternalRow): Seq[InternalRow] = { val unsafeKey = key.asInstanceOf[UnsafeRow] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
