Repository: spark
Updated Branches:
  refs/heads/master a75f92717 -> 76e019d9b


[SPARK-21860][CORE] Improve memory reuse for heap memory in 
`HeapMemoryAllocator`

## What changes were proposed in this pull request?
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool 
is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we 
can think they are the same,because we allocate memory in multiples of 8 
bytes.
In this case, we can improve memory reuse.

## How was this patch tested?
Existing tests and added unit tests

Author: liuxian <liu.xi...@zte.com.cn>

Closes #19077 from 10110346/headmemoptimize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e019d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e019d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e019d9

Branch: refs/heads/master
Commit: 76e019d9bdcdca176c79c1cd71ddbf496333bf93
Parents: a75f927
Author: liuxian <liu.xi...@zte.com.cn>
Authored: Thu Feb 8 23:41:30 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Feb 8 23:41:30 2018 +0800

----------------------------------------------------------------------
 .../unsafe/memory/HeapMemoryAllocator.java      | 18 +++++++++-------
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 22 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76e019d9/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index a9603c1..2733760 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -46,9 +46,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
 
   @Override
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
-    if (shouldPool(size)) {
+    int numWords = (int) ((size + 7) / 8);
+    long alignedSize = numWords * 8L;
+    assert (alignedSize >= size);
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        final LinkedList<WeakReference<long[]>> pool = 
bufferPoolsBySize.get(size);
+        final LinkedList<WeakReference<long[]>> pool = 
bufferPoolsBySize.get(alignedSize);
         if (pool != null) {
           while (!pool.isEmpty()) {
             final WeakReference<long[]> arrayReference = pool.pop();
@@ -62,11 +65,11 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
               return memory;
             }
           }
-          bufferPoolsBySize.remove(size);
+          bufferPoolsBySize.remove(alignedSize);
         }
       }
     }
-    long[] array = new long[(int) ((size + 7) / 8)];
+    long[] array = new long[numWords];
     MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, 
size);
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
@@ -98,12 +101,13 @@ public class HeapMemoryAllocator implements 
MemoryAllocator {
     long[] array = (long[]) memory.obj;
     memory.setObjAndOffset(null, 0);
 
-    if (shouldPool(size)) {
+    long alignedSize = ((size + 7) / 8) * 8;
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
+        LinkedList<WeakReference<long[]>> pool = 
bufferPoolsBySize.get(alignedSize);
         if (pool == null) {
           pool = new LinkedList<>();
-          bufferPoolsBySize.put(size, pool);
+          bufferPoolsBySize.put(alignedSize, pool);
         }
         pool.add(new WeakReference<>(array));
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/76e019d9/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git 
a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java 
b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 6285483..71c53d3 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.unsafe;
 
+import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
 import org.apache.spark.unsafe.memory.MemoryAllocator;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 
@@ -134,4 +135,25 @@ public class PlatformUtilSuite {
       MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
     MemoryAllocator.UNSAFE.free(offheap);
   }
+
+  @Test
+  public void heapMemoryReuse() {
+    MemoryAllocator heapMem = new HeapMemoryAllocator();
+    // The size is less than 
`HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
+    MemoryBlock onheap1 = heapMem.allocate(513);
+    Object obj1 = onheap1.getBaseObject();
+    heapMem.free(onheap1);
+    MemoryBlock onheap2 = heapMem.allocate(514);
+    Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+    // The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
+    // reuse the previous memory which has released.
+    MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+    Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+    Object obj3 = onheap3.getBaseObject();
+    heapMem.free(onheap3);
+    MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+    Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+    Assert.assertEquals(obj3, onheap4.getBaseObject());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to