Repository: spark
Updated Branches:
  refs/heads/branch-1.6 8ac8374a6 -> 0eb82133f


[SPARK-11792][SQL] SizeEstimator cannot provide a good size estimation of 
UnsafeHashedRelations

https://issues.apache.org/jira/browse/SPARK-11792

Right now, SizeEstimator will "think" a small UnsafeHashedRelation is several 
GBs.

Author: Yin Huai <[email protected]>

Closes #9788 from yhuai/SPARK-11792.

(cherry picked from commit 1714350bddd78cd1398e1a816f675ab729001081)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eb82133
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eb82133
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eb82133

Branch: refs/heads/branch-1.6
Commit: 0eb82133f11f2ce1f4f4a68cac3ad3c2b3089b64
Parents: 8ac8374
Author: Yin Huai <[email protected]>
Authored: Wed Nov 18 00:42:52 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Wed Nov 18 00:42:58 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/memory/TaskMemoryManager.java  |  3 +++
 .../org/apache/spark/util/SizeEstimator.scala   | 26 +++++++++++++++++---
 .../apache/spark/util/SizeEstimatorSuite.scala  | 22 +++++++++++++++++
 .../sql/execution/joins/HashedRelation.scala    | 10 ++++++--
 4 files changed, 55 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0eb82133/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 5f743b2..d31eb44 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -215,6 +215,9 @@ public class TaskMemoryManager {
       logger.info(
         "{} bytes of memory were used by task {} but are not associated with 
specific consumers",
         memoryNotAccountedFor, taskAttemptId);
+      logger.info(
+        "{} bytes of memory are used for execution and {} bytes of memory are 
used for storage",
+        memoryManager.executionMemoryUsed(), 
memoryManager.storageMemoryUsed());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0eb82133/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala 
b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 23ee4ef..c3a2675 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -32,6 +32,16 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.collection.OpenHashSet
 
 /**
+ * A trait that allows a class to give [[SizeEstimator]] more accurate size 
estimation.
+ * When a class extends it, [[SizeEstimator]] will query the `estimatedSize` 
first.
+ * If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the 
returned size
+ * as the size of the object. Otherwise, [[SizeEstimator]] will do the 
estimation work.
+ */
+private[spark] trait SizeEstimation {
+  def estimatedSize: Option[Long]
+}
+
+/**
  * :: DeveloperApi ::
  * Estimates the sizes of Java objects (number of bytes of memory they 
occupy), for use in
  * memory-aware caches.
@@ -199,10 +209,18 @@ object SizeEstimator extends Logging {
       // the size estimator since it references the whole REPL. Do nothing in 
this case. In
       // general all ClassLoaders and Classes will be shared between objects 
anyway.
     } else {
-      val classInfo = getClassInfo(cls)
-      state.size += alignSize(classInfo.shellSize)
-      for (field <- classInfo.pointerFields) {
-        state.enqueue(field.get(obj))
+      val estimatedSize = obj match {
+        case s: SizeEstimation => s.estimatedSize
+        case _ => None
+      }
+      if (estimatedSize.isDefined) {
+        state.size += estimatedSize.get
+      } else {
+        val classInfo = getClassInfo(cls)
+        state.size += alignSize(classInfo.shellSize)
+        for (field <- classInfo.pointerFields) {
+          state.enqueue(field.get(obj))
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0eb82133/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 2055017..9b6261a 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -60,6 +60,18 @@ class DummyString(val arr: Array[Char]) {
   @transient val hash32: Int = 0
 }
 
+class DummyClass8 extends SizeEstimation {
+  val x: Int = 0
+
+  override def estimatedSize: Option[Long] = Some(2015)
+}
+
+class DummyClass9 extends SizeEstimation {
+  val x: Int = 0
+
+  override def estimatedSize: Option[Long] = None
+}
+
 class SizeEstimatorSuite
   extends SparkFunSuite
   with BeforeAndAfterEach
@@ -214,4 +226,14 @@ class SizeEstimatorSuite
     // Class should be 32 bytes on s390x if recognised as 64 bit platform
     assertResult(32)(SizeEstimator.estimate(new DummyClass7))
   }
+
+  test("SizeEstimation can provide the estimated size") {
+    // DummyClass8 provides its size estimation.
+    assertResult(2015)(SizeEstimator.estimate(new DummyClass8))
+    assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new 
DummyClass8)))
+
+    // DummyClass9 does not provide its size estimation.
+    assertResult(16)(SizeEstimator.estimate(new DummyClass9))
+    assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass9)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0eb82133/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index cc8abb1..49ae09b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, 
SQLMetrics}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.map.BytesToBytesMap
 import org.apache.spark.unsafe.memory.MemoryLocation
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SizeEstimation, Utils}
 import org.apache.spark.util.collection.CompactBuffer
 import org.apache.spark.{SparkConf, SparkEnv}
 
@@ -189,7 +189,9 @@ private[execution] object HashedRelation {
  */
 private[joins] final class UnsafeHashedRelation(
     private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]])
-  extends HashedRelation with Externalizable {
+  extends HashedRelation
+  with SizeEstimation
+  with Externalizable {
 
   private[joins] def this() = this(null)  // Needed for serialization
 
@@ -215,6 +217,10 @@ private[joins] final class UnsafeHashedRelation(
     }
   }
 
+  override def estimatedSize: Option[Long] = {
+    Option(binaryMap).map(_.getTotalMemoryConsumption)
+  }
+
   override def get(key: InternalRow): Seq[InternalRow] = {
     val unsafeKey = key.asInstanceOf[UnsafeRow]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to