This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7258f69 [SPARK-35396] Add AutoCloseable close to BlockManager and
InMemoryRelation
7258f69 is described below
commit 7258f691887aedcf7ba3eb4e478d67a5637643b9
Author: Chendi Xue <[email protected]>
AuthorDate: Tue May 25 08:55:25 2021 -0500
[SPARK-35396] Add AutoCloseable close to BlockManager and InMemoryRelation
This PR is proposing a add-on to support to manual close entries in
MemoryStore and InMemoryRelation
### What changes were proposed in this pull request?
Currently:
MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all
OnHeap or OffHeap entries.
And when memoryStore.remove(blockId) is called, codes will simply remove
one entry from LinkedHashMap and leverage Java GC to do release work.
This PR:
We are proposing a add-on to manually close any object stored in
MemoryStore and InMemoryRelation if this object is extended from AutoCloseable.
Veifiication:
In our own use case, we implemented a user-defined
off-heap-hashRelation for BHJ, and we verified that by adding this manual
close, we can make sure our defined off-heap-hashRelation can be released when
evict is called.
Also, we implemented user-defined cachedBatch and will be release when
InMemoryRelation.clearCache() is called by this PR
### Why are the changes needed?
This changes can help to clean some off-heap user-defined object may be
cached in InMemoryRelation or MemoryStore
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
WIP
Signed-off-by: Chendi Xue <chendi.xueintel.com>
Closes #32534 from xuechendi/support_manual_close_in_memorystore.
Authored-by: Chendi Xue <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../apache/spark/storage/memory/MemoryStore.scala | 32 ++++++-
.../apache/spark/storage/MemoryStoreSuite.scala | 100 +++++++++++++++++++++
2 files changed, 131 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 375d05b..2079b26 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -21,9 +21,14 @@ import java.io.OutputStream
import java.nio.ByteBuffer
import java.util.LinkedHashMap
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
import scala.reflect.ClassTag
+import scala.util.{Failure, Success}
+import scala.util.control.NonFatal
import com.google.common.io.ByteStreams
@@ -385,9 +390,33 @@ private[spark] class MemoryStore(
}
}
+ def manualClose[T <: MemoryEntry[_]](entry: T): T = {
+ val entryManualCloseTasks = Future {
+ entry match {
+ case e: DeserializedMemoryEntry[_] => e.value.foreach {
+ case o: AutoCloseable =>
+ try {
+ o.close
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Got NonFatal exception during remove")
+ }
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+ entryManualCloseTasks.onComplete {
+ case Success(_) =>
+ case Failure(e) => throw e
+ }
+ entry
+ }
+
def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val entry = entries.synchronized {
- entries.remove(blockId)
+ val removed = entries.remove(blockId)
+ manualClose(removed)
}
if (entry != null) {
entry match {
@@ -405,6 +434,7 @@ private[spark] class MemoryStore(
def clear(): Unit = memoryManager.synchronized {
entries.synchronized {
+ entries.values.asScala.foreach(manualClose)
entries.clear()
}
onHeapUnrollMemoryMap.clear()
diff --git
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index d6a4e5b..fea4882 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -546,4 +546,104 @@ class MemoryStoreSuite
}
}
}
+
+ test("put user-defined objects to MemoryStore and remove") {
+ val (memoryStore, _) = makeMemoryStore(12000)
+ val blockId = BlockId("rdd_3_10")
+ case class DummyAllocator() {
+ private var allocated: Int = 0
+ def alloc(size: Int): Unit = synchronized {
+ allocated += size
+ }
+ def release(size: Int): Unit = synchronized {
+ allocated -= size
+ }
+ def getAllocatedMemory: Int = synchronized {
+ allocated
+ }
+ }
+ case class NativeObject(alloc: DummyAllocator, size: Int)
+ extends KnownSizeEstimation
+ with AutoCloseable {
+ alloc.alloc(size)
+ var allocated_size: Int = size
+ override def estimatedSize: Long = allocated_size
+ override def close(): Unit = synchronized {
+ alloc.release(allocated_size)
+ allocated_size = 0
+ }
+ }
+ val allocator = DummyAllocator()
+ val nativeObjList = List.fill(40)(NativeObject(allocator, 100))
+ def nativeObjIterator: Iterator[Any] =
nativeObjList.iterator.asInstanceOf[Iterator[Any]]
+ def putIteratorAsValues[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+ memoryStore.putIteratorAsValues(blockId, iter, classTag)
+ }
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ assert(putIteratorAsValues("b1", nativeObjIterator, ClassTag.Any).isRight)
+ assert(putIteratorAsValues("b2", nativeObjIterator, ClassTag.Any).isRight)
+
+ memoryStore.remove("b1")
+ memoryStore.remove("b2")
+
+ // Check if allocator was cleared.
+ while (allocator.getAllocatedMemory > 0) {
+ Thread.sleep(500)
+ }
+ assert(allocator.getAllocatedMemory == 0)
+ }
+
+ test("put user-defined objects to MemoryStore and clear") {
+ val (memoryStore, _) = makeMemoryStore(12000)
+ val blockId = BlockId("rdd_3_10")
+ case class DummyAllocator() {
+ private var allocated: Int = 0
+ def alloc(size: Int): Unit = synchronized {
+ allocated += size
+ }
+ def release(size: Int): Unit = synchronized {
+ allocated -= size
+ }
+ def getAllocatedMemory: Int = synchronized {
+ allocated
+ }
+ }
+ case class NativeObject(alloc: DummyAllocator, size: Int)
+ extends KnownSizeEstimation
+ with AutoCloseable {
+
+ alloc.alloc(size)
+ var allocated_size: Int = size
+ override def estimatedSize: Long = allocated_size
+ override def close(): Unit = synchronized {
+ Thread.sleep(10)
+ alloc.release(allocated_size)
+ allocated_size = 0
+ }
+ }
+ val allocator = DummyAllocator()
+ val nativeObjList = List.fill(40)(NativeObject(allocator, 100))
+ def nativeObjIterator: Iterator[Any] =
nativeObjList.iterator.asInstanceOf[Iterator[Any]]
+ def putIteratorAsValues[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+ memoryStore.putIteratorAsValues(blockId, iter, classTag)
+ }
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ assert(putIteratorAsValues("b1", nativeObjIterator, ClassTag.Any).isRight)
+ assert(putIteratorAsValues("b2", nativeObjIterator, ClassTag.Any).isRight)
+
+ memoryStore.clear
+ // Check if allocator was cleared.
+ while (allocator.getAllocatedMemory > 0) {
+ Thread.sleep(500)
+ }
+ assert(allocator.getAllocatedMemory == 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]