[CARBONDATA-2184]Improve memory reuse for heap memory in `HeapMemoryAllocator`

The description in 
[SPARK-21860|https://issues.apache.org/jira/browse/SPARK-21860]:
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool 
is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we 
can think they are the same,because we allocate memory in multiples of 8 
bytes.
In this case, we can improve memory reuse.

This closes #1982


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/55fe349d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/55fe349d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/55fe349d

Branch: refs/heads/branch-1.3
Commit: 55fe349d0b1f731565a471c42d37eac971e46168
Parents: abb0a0b
Author: Zhang Zhichao <441586...@qq.com>
Authored: Sun Feb 18 00:55:04 2018 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Sat Mar 3 18:02:23 2018 +0530

----------------------------------------------------------------------
 .../core/memory/HeapMemoryAllocator.java        | 52 +++++++++-----
 .../carbondata/core/util/CarbonProperties.java  | 19 ++++++
 .../core/memory/MemoryAllocatorUnitTest.java    | 71 ++++++++++++++++++++
 3 files changed, 127 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/55fe349d/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java 
b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index 5862933..242995b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -23,16 +23,27 @@ import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.concurrent.GuardedBy;
 
+import org.apache.carbondata.core.util.CarbonProperties;
+
 /**
  * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM 
long primitive array.
  */
 public class HeapMemoryAllocator implements MemoryAllocator {
 
-  @GuardedBy("this") private final Map<Long, 
LinkedList<WeakReference<MemoryBlock>>>
+  @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<long[]>>>
       bufferPoolsBySize = new HashMap<>();
 
-  private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+  private int poolingThresholdBytes;
+  private boolean shouldPooling = true;
+
+  public HeapMemoryAllocator() {
+    poolingThresholdBytes = 
CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes();
+    // if set 'poolingThresholdBytes' to -1, it should not go through the 
pooling mechanism.
+    if (poolingThresholdBytes == -1) {
+      shouldPooling = false;
+    }
+  }
 
   /**
    * Returns true if allocations of the given size should go through the 
pooling mechanism and
@@ -40,42 +51,53 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
    */
   private boolean shouldPool(long size) {
     // Very small allocations are less likely to benefit from pooling.
-    return size >= POOLING_THRESHOLD_BYTES;
+    return shouldPooling && (size >= poolingThresholdBytes);
   }
 
   @Override public MemoryBlock allocate(long size) throws OutOfMemoryError {
-    if (shouldPool(size)) {
+    int numWords = (int) ((size + 7) / 8);
+    long alignedSize = numWords * 8L;
+    assert (alignedSize >= size);
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        final LinkedList<WeakReference<MemoryBlock>> pool = 
bufferPoolsBySize.get(size);
+        final LinkedList<WeakReference<long[]>> pool = 
bufferPoolsBySize.get(alignedSize);
         if (pool != null) {
           while (!pool.isEmpty()) {
-            final WeakReference<MemoryBlock> blockReference = pool.pop();
-            final MemoryBlock memory = blockReference.get();
-            if (memory != null) {
-              assert (memory.size() == size);
+            final WeakReference<long[]> arrayReference = pool.pop();
+            final long[] array = arrayReference.get();
+            if (array != null) {
+              assert (array.length * 8L >= size);
+              MemoryBlock memory = new MemoryBlock(array, 
CarbonUnsafe.LONG_ARRAY_OFFSET, size);
               // reuse this MemoryBlock
               memory.setFreedStatus(false);
               return memory;
             }
           }
-          bufferPoolsBySize.remove(size);
+          bufferPoolsBySize.remove(alignedSize);
         }
       }
     }
-    long[] array = new long[(int) ((size + 7) / 8)];
+    long[] array = new long[numWords];
     return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
   }
 
   @Override public void free(MemoryBlock memory) {
     final long size = memory.size();
-    if (shouldPool(size)) {
+
+    // As an additional layer of defense against use-after-free bugs, we 
mutate the
+    // MemoryBlock to null out its reference to the long[] array.
+    long[] array = (long[]) memory.obj;
+    memory.setObjAndOffset(null, 0);
+
+    long alignedSize = ((size + 7) / 8) * 8;
+    if (shouldPool(alignedSize)) {
       synchronized (this) {
-        LinkedList<WeakReference<MemoryBlock>> pool = 
bufferPoolsBySize.get(size);
+        LinkedList<WeakReference<long[]>> pool = 
bufferPoolsBySize.get(alignedSize);
         if (pool == null) {
           pool = new LinkedList<>();
-          bufferPoolsBySize.put(size, pool);
+          bufferPoolsBySize.put(alignedSize, pool);
         }
-        pool.add(new WeakReference<>(memory));
+        pool.add(new WeakReference<>(array));
       }
     }
     memory.setFreedStatus(true);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55fe349d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 3dc7b8f..667c45c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1351,4 +1351,23 @@ public final class CarbonProperties {
         unsafeSortStorageMemory + "");
   }
 
+  /**
+   * Get the heap memory pooling threshold bytes.
+   */
+  public int getHeapMemoryPoolingThresholdBytes() {
+    int thresholdSize;
+    try {
+      thresholdSize = Integer.parseInt(CarbonProperties.getInstance()
+          
.getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES,
+              
CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT));
+    } catch (NumberFormatException exc) {
+      LOGGER.error(
+          "The heap memory pooling threshold bytes is invalid. Using the 
default value "
+              + 
CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
+      thresholdSize = Integer.parseInt(
+          
CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT);
+    }
+    return thresholdSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55fe349d/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
 
b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
new file mode 100644
index 0000000..df1e103
--- /dev/null
+++ 
b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public class MemoryAllocatorUnitTest {
+
+  @Test
+  public void testHeapMemoryReuse() {
+    MemoryAllocator heapMem = new HeapMemoryAllocator();
+    // The size is less than 1024 * 1024,
+    // allocate new memory every time.
+    MemoryBlock onheap1 = heapMem.allocate(513);
+    Object obj1 = onheap1.getBaseObject();
+    heapMem.free(onheap1);
+    MemoryBlock onheap2 = heapMem.allocate(514);
+    Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+    // The size is greater than 1024 * 1024,
+    // reuse the previous memory which has released.
+    MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+    Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+    Object obj3 = onheap3.getBaseObject();
+    heapMem.free(onheap3);
+    MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+    Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+    Assert.assertEquals(obj3, onheap4.getBaseObject());
+  }
+
+  @Test
+  public void testHeapMemoryNotPool() {
+    // not pool
+    CarbonProperties.getInstance()
+        
.addProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES, 
"-1");
+
+    MemoryAllocator heapMem = new HeapMemoryAllocator();
+    MemoryBlock onheap1 = heapMem.allocate(513);
+    Object obj1 = onheap1.getBaseObject();
+    heapMem.free(onheap1);
+    MemoryBlock onheap2 = heapMem.allocate(514);
+    Assert.assertNotEquals(obj1, onheap2.getBaseObject());
+
+    MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
+    Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
+    Object obj3 = onheap3.getBaseObject();
+    heapMem.free(onheap3);
+    MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
+    Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
+    Assert.assertNotEquals(obj3, onheap4.getBaseObject());
+  }
+}

Reply via email to