[CARBONDATA-2184]Improve memory reuse for heap memory in `HeapMemoryAllocator`
The description in [SPARK-21860|https://issues.apache.org/jira/browse/SPARK-21860]: In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size. Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the sameï¼because we allocate memory in multiples of 8 bytes. In this case, we can improve memory reuse. This closes #1982 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/55fe349d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/55fe349d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/55fe349d Branch: refs/heads/branch-1.3 Commit: 55fe349d0b1f731565a471c42d37eac971e46168 Parents: abb0a0b Author: Zhang Zhichao <441586...@qq.com> Authored: Sun Feb 18 00:55:04 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Mar 3 18:02:23 2018 +0530 ---------------------------------------------------------------------- .../core/memory/HeapMemoryAllocator.java | 52 +++++++++----- .../carbondata/core/util/CarbonProperties.java | 19 ++++++ .../core/memory/MemoryAllocatorUnitTest.java | 71 ++++++++++++++++++++ 3 files changed, 127 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/55fe349d/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java index 5862933..242995b 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java @@ -23,16 +23,27 @@ import java.util.LinkedList; import java.util.Map; import javax.annotation.concurrent.GuardedBy; +import org.apache.carbondata.core.util.CarbonProperties; + /** * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array. */ public class HeapMemoryAllocator implements MemoryAllocator { - @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> + @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>(); - private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; + private int poolingThresholdBytes; + private boolean shouldPooling = true; + + public HeapMemoryAllocator() { + poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes(); + // if set 'poolingThresholdBytes' to -1, it should not go through the pooling mechanism. + if (poolingThresholdBytes == -1) { + shouldPooling = false; + } + } /** * Returns true if allocations of the given size should go through the pooling mechanism and @@ -40,42 +51,53 @@ public class HeapMemoryAllocator implements MemoryAllocator { */ private boolean shouldPool(long size) { // Very small allocations are less likely to benefit from pooling. - return size >= POOLING_THRESHOLD_BYTES; + return shouldPooling && (size >= poolingThresholdBytes); } @Override public MemoryBlock allocate(long size) throws OutOfMemoryError { - if (shouldPool(size)) { + int numWords = (int) ((size + 7) / 8); + long alignedSize = numWords * 8L; + assert (alignedSize >= size); + if (shouldPool(alignedSize)) { synchronized (this) { - final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize); if (pool != null) { while (!pool.isEmpty()) { - final WeakReference<MemoryBlock> blockReference = pool.pop(); - final MemoryBlock memory = blockReference.get(); - if (memory != null) { - assert (memory.size() == size); + final WeakReference<long[]> arrayReference = pool.pop(); + final long[] array = arrayReference.get(); + if (array != null) { + assert (array.length * 8L >= size); + MemoryBlock memory = new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size); // reuse this MemoryBlock memory.setFreedStatus(false); return memory; } } - bufferPoolsBySize.remove(size); + bufferPoolsBySize.remove(alignedSize); } } } - long[] array = new long[(int) ((size + 7) / 8)]; + long[] array = new long[numWords]; return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size); } @Override public void free(MemoryBlock memory) { final long size = memory.size(); - if (shouldPool(size)) { + + // As an additional layer of defense against use-after-free bugs, we mutate the + // MemoryBlock to null out its reference to the long[] array. + long[] array = (long[]) memory.obj; + memory.setObjAndOffset(null, 0); + + long alignedSize = ((size + 7) / 8) * 8; + if (shouldPool(alignedSize)) { synchronized (this) { - LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize); if (pool == null) { pool = new LinkedList<>(); - bufferPoolsBySize.put(size, pool); + bufferPoolsBySize.put(alignedSize, pool); } - pool.add(new WeakReference<>(memory)); + pool.add(new WeakReference<>(array)); } } memory.setFreedStatus(true); http://git-wip-us.apache.org/repos/asf/carbondata/blob/55fe349d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 3dc7b8f..667c45c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1351,4 +1351,23 @@ public final class CarbonProperties { unsafeSortStorageMemory + ""); } + /** + * Get the heap memory pooling threshold bytes. + */ + public int getHeapMemoryPoolingThresholdBytes() { + int thresholdSize; + try { + thresholdSize = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES, + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT)); + } catch (NumberFormatException exc) { + LOGGER.error( + "The heap memory pooling threshold bytes is invalid. Using the default value " + + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT); + thresholdSize = Integer.parseInt( + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT); + } + return thresholdSize; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55fe349d/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java new file mode 100644 index 0000000..df1e103 --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/memory/MemoryAllocatorUnitTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.memory; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; + +public class MemoryAllocatorUnitTest { + + @Test + public void testHeapMemoryReuse() { + MemoryAllocator heapMem = new HeapMemoryAllocator(); + // The size is less than 1024 * 1024, + // allocate new memory every time. + MemoryBlock onheap1 = heapMem.allocate(513); + Object obj1 = onheap1.getBaseObject(); + heapMem.free(onheap1); + MemoryBlock onheap2 = heapMem.allocate(514); + Assert.assertNotEquals(obj1, onheap2.getBaseObject()); + + // The size is greater than 1024 * 1024, + // reuse the previous memory which has released. + MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1); + Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1); + Object obj3 = onheap3.getBaseObject(); + heapMem.free(onheap3); + MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7); + Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7); + Assert.assertEquals(obj3, onheap4.getBaseObject()); + } + + @Test + public void testHeapMemoryNotPool() { + // not pool + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES, "-1"); + + MemoryAllocator heapMem = new HeapMemoryAllocator(); + MemoryBlock onheap1 = heapMem.allocate(513); + Object obj1 = onheap1.getBaseObject(); + heapMem.free(onheap1); + MemoryBlock onheap2 = heapMem.allocate(514); + Assert.assertNotEquals(obj1, onheap2.getBaseObject()); + + MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1); + Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1); + Object obj3 = onheap3.getBaseObject(); + heapMem.free(onheap3); + MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7); + Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7); + Assert.assertNotEquals(obj3, onheap4.getBaseObject()); + } +}