This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 443bca1 LARGE_MESSAGE connection allocates heap buffer when BufferPool exhausted 443bca1 is described below commit 443bca18839268cd100930b380e0534b052a8c89 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Mon Nov 4 16:53:30 2019 +0000 LARGE_MESSAGE connection allocates heap buffer when BufferPool exhausted patch by Benedict; reviewed by David Capwell for CASSANDRA-15358 --- src/java/org/apache/cassandra/config/Config.java | 3 +- .../cassandra/config/DatabaseDescriptor.java | 5 -- .../cassandra/net/AsyncStreamingOutputPlus.java | 2 +- .../cassandra/net/LocalBufferPoolAllocator.java | 4 +- .../apache/cassandra/utils/memory/BufferPool.java | 79 +++++++------------ .../apache/cassandra/utils/memory/MemoryUtil.java | 5 ++ .../cassandra/utils/memory/LongBufferPoolTest.java | 3 +- .../cassandra/utils/memory/BufferPoolTest.java | 88 ++++++---------------- 8 files changed, 61 insertions(+), 128 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 3fc314f..b26b253 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -314,7 +314,8 @@ public class Config */ public Boolean file_cache_round_up; - public boolean buffer_pool_use_heap_if_exhausted = true; + @Deprecated + public boolean buffer_pool_use_heap_if_exhausted; public DiskOptimizationStrategy disk_optimization_strategy = DiskOptimizationStrategy.ssd; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 7af310e..e6bee3a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2429,11 +2429,6 @@ public class DatabaseDescriptor return conf.file_cache_round_up; } - public static boolean getBufferPoolUseHeapIfExhausted() - { - return conf.buffer_pool_use_heap_if_exhausted; - } - public static DiskOptimizationStrategy getDiskOptimizationStrategy() { return diskOptimizationStrategy; diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java index a52070e..a2dff41 100644 --- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java +++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java @@ -137,7 +137,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus throw new IllegalStateException("Can only allocate one ByteBuffer"); limiter.acquire(size); holder.promise = beginFlush(size, defaultLowWaterMark, defaultHighWaterMark); - holder.buffer = BufferPool.get(size); + holder.buffer = BufferPool.get(size, BufferType.OFF_HEAP); return holder.buffer; }); } diff --git a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java index 384563f..b2d487f 100644 --- a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java +++ b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java @@ -45,7 +45,7 @@ class LocalBufferPoolAllocator extends BufferPoolAllocator { if (!eventLoop.inEventLoop()) throw new IllegalStateException("get() called from outside of owning event loop"); - return pool.get(size, false); + return pool.get(size); } @Override @@ -53,7 +53,7 @@ class LocalBufferPoolAllocator extends BufferPoolAllocator { if (!eventLoop.inEventLoop()) throw new IllegalStateException("getAtLeast() called from outside of owning event loop"); - return pool.get(size, true); + return pool.getAtLeast(size); } @Override diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index c2e8108..3fc7992 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -51,6 +51,7 @@ import org.apache.cassandra.utils.concurrent.Ref; import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.utils.ExecutorUtils.*; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; +import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect; /** * A pool of ByteBuffers that can be recycled. @@ -73,14 +74,8 @@ public class BufferPool @VisibleForTesting public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; - @VisibleForTesting - public static boolean ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = DatabaseDescriptor.getBufferPoolUseHeapIfExhausted(); - private static Debug debug; - @VisibleForTesting - public static boolean DISABLED = Boolean.parseBoolean(System.getProperty("cassandra.test.disable_buffer_pool", "false")); - private static final Logger logger = LoggerFactory.getLogger(BufferPool.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES); private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); @@ -103,36 +98,26 @@ public class BufferPool } }; - public static ByteBuffer get(int size) - { - if (DISABLED) - return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); - else - return localPool.get().get(size, false, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); - } - public static ByteBuffer get(int size, BufferType bufferType) { - boolean onHeap = bufferType == BufferType.ON_HEAP; - if (DISABLED || onHeap) - return allocate(size, onHeap); + if (bufferType == BufferType.ON_HEAP) + return allocate(size, bufferType); else - return localPool.get().get(size, false, onHeap); + return localPool.get().get(size); } public static ByteBuffer getAtLeast(int size, BufferType bufferType) { - boolean onHeap = bufferType == BufferType.ON_HEAP; - if (DISABLED || onHeap) - return allocate(size, onHeap); + if (bufferType == BufferType.ON_HEAP) + return allocate(size, bufferType); else - return localPool.get().get(size, true, onHeap); + return localPool.get().getAtLeast(size); } /** Unlike the get methods, this will return null if the pool is exhausted */ public static ByteBuffer tryGet(int size) { - return localPool.get().tryGet(size, true); + return localPool.get().tryGet(size, false); } public static ByteBuffer tryGetAtLeast(int size) @@ -140,23 +125,22 @@ public class BufferPool return localPool.get().tryGet(size, true); } - private static ByteBuffer allocate(int size, boolean onHeap) + private static ByteBuffer allocate(int size, BufferType bufferType) { - return onHeap + return bufferType == BufferType.ON_HEAP ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size); } public static void put(ByteBuffer buffer) { - if (!(DISABLED || buffer.hasArray())) + if (isExactlyDirect(buffer)) localPool.get().put(buffer); } public static void putUnusedPortion(ByteBuffer buffer) { - - if (!(DISABLED || buffer.hasArray())) + if (isExactlyDirect(buffer)) { LocalPool pool = localPool.get(); if (buffer.limit() > 0) @@ -210,12 +194,8 @@ public class BufferPool assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2 assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple - if (DISABLED) - logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); - else - logger.info("Global buffer pool is enabled, when pool is exhausted (max is {}) it will allocate {}", - prettyPrintMemory(MEMORY_USAGE_THRESHOLD), - ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); + logger.info("Global buffer pool limit is {}", + prettyPrintMemory(MEMORY_USAGE_THRESHOLD)); } private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>(); @@ -249,9 +229,12 @@ public class BufferPool long cur = memoryUsage.get(); if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD) { - noSpamLogger.info("Maximum memory usage reached ({}), cannot allocate chunk of {}", - prettyPrintMemory(MEMORY_USAGE_THRESHOLD), - prettyPrintMemory(MACRO_CHUNK_SIZE)); + if (MEMORY_USAGE_THRESHOLD > 0) + { + noSpamLogger.info("Maximum memory usage reached ({}), cannot allocate chunk of {}", + prettyPrintMemory(MEMORY_USAGE_THRESHOLD), + prettyPrintMemory(MACRO_CHUNK_SIZE)); + } return null; } if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE)) @@ -633,25 +616,15 @@ public class BufferPool public ByteBuffer get(int size) { - return get(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); - } - - public ByteBuffer get(int size, boolean allocateOnHeapWhenExhausted) - { - return get(size, false, allocateOnHeapWhenExhausted); + return get(size, false); } public ByteBuffer getAtLeast(int size) { - return getAtLeast(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); - } - - public ByteBuffer getAtLeast(int size, boolean allocateOnHeapWhenExhausted) - { - return get(size, true, allocateOnHeapWhenExhausted); + return get(size, true); } - private ByteBuffer get(int size, boolean sizeIsLowerBound, boolean allocateOnHeapWhenExhausted) + private ByteBuffer get(int size, boolean sizeIsLowerBound) { ByteBuffer ret = tryGet(size, sizeIsLowerBound); if (ret != null) @@ -671,12 +644,12 @@ public class BufferPool } metrics.misses.mark(); - return allocate(size, allocateOnHeapWhenExhausted); + return allocate(size, BufferType.OFF_HEAP); } public ByteBuffer tryGet(int size) { - return tryGet(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); + return tryGet(size, false); } public ByteBuffer tryGetAtLeast(int size) @@ -890,7 +863,7 @@ public class BufferPool Chunk(Recycler recycler, ByteBuffer slab) { - assert !slab.hasArray(); + assert MemoryUtil.isExactlyDirect(slab); this.recycler = recycler; this.slab = slab; this.baseAddress = MemoryUtil.getAddress(slab); diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java index 6b30f44..e194962 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java +++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java @@ -196,6 +196,11 @@ public abstract class MemoryUtil return instance; } + public static boolean isExactlyDirect(ByteBuffer buffer) + { + return buffer.getClass() == DIRECT_BYTE_BUFFER_CLASS; + } + public static Object getAttachment(ByteBuffer instance) { assert instance.getClass() == DIRECT_BYTE_BUFFER_CLASS; diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 838038a..c8368dd 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.DynamicList; import static org.junit.Assert.*; @@ -424,7 +425,7 @@ public class LongBufferPoolTest BufferCheck allocate(int size) { - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); BufferCheck check = new BufferCheck(buffer, rand.nextLong()); assertEquals(size, buffer.capacity()); diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java index 62cb33b..85ca2d0 100644 --- a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java @@ -47,7 +47,6 @@ public class BufferPoolTest public void setUp() { BufferPool.MEMORY_USAGE_THRESHOLD = 8 * 1024L * 1024L; - BufferPool.DISABLED = false; } @After @@ -61,7 +60,7 @@ public class BufferPoolTest { final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE; - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); assertEquals(true, buffer.isDirect()); @@ -90,7 +89,7 @@ public class BufferPoolTest private void checkPageAligned(int size) { - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); assertTrue(buffer.isDirect()); @@ -107,11 +106,11 @@ public class BufferPoolTest final int size1 = 1024; final int size2 = 2048; - ByteBuffer buffer1 = BufferPool.get(size1); + ByteBuffer buffer1 = BufferPool.get(size1, BufferType.OFF_HEAP); assertNotNull(buffer1); assertEquals(size1, buffer1.capacity()); - ByteBuffer buffer2 = BufferPool.get(size2); + ByteBuffer buffer2 = BufferPool.get(size2, BufferType.OFF_HEAP); assertNotNull(buffer2); assertEquals(size2, buffer2.capacity()); @@ -129,23 +128,13 @@ public class BufferPoolTest @Test public void testMaxMemoryExceededDirect() { - boolean cur = BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED; - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = false; - requestDoubleMaxMemory(); - - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = cur; } @Test public void testMaxMemoryExceededHeap() { - boolean cur = BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED; - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true; - requestDoubleMaxMemory(); - - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = cur; } @Test @@ -180,13 +169,10 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) { - ByteBuffer buffer = BufferPool.get(bufferSize); + ByteBuffer buffer = BufferPool.get(bufferSize, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(bufferSize, buffer.capacity()); - - if (BufferPool.sizeInBytes() > BufferPool.MEMORY_USAGE_THRESHOLD) - assertEquals(BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED, !buffer.isDirect()); - + assertTrue(buffer.isDirect()); buffers.add(buffer); } @@ -199,7 +185,7 @@ public class BufferPoolTest { final int size = BufferPool.NORMAL_CHUNK_SIZE + 1; - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); BufferPool.put(buffer); @@ -214,13 +200,13 @@ public class BufferPoolTest List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers); List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) - buffers1.add(BufferPool.get(size)); + buffers1.add(BufferPool.get(size, BufferType.OFF_HEAP)); BufferPool.Chunk chunk1 = BufferPool.unsafeCurrentChunk(); assertNotNull(chunk1); for (int i = 0; i < numBuffers; i++) - buffers2.add(BufferPool.get(size)); + buffers2.add(BufferPool.get(size, BufferType.OFF_HEAP)); assertEquals(2, BufferPool.unsafeNumChunks()); @@ -308,7 +294,7 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots); for (int i = 0; i < maxFreeSlots; i++) { - buffers.add(BufferPool.get(size)); + buffers.add(BufferPool.get(size, BufferType.OFF_HEAP)); } BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); @@ -346,7 +332,7 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(sizes.length); for (int i = 0; i < sizes.length; i++) { - ByteBuffer buffer = BufferPool.get(sizes[i]); + ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffer); assertTrue(buffer.capacity() >= sizes[i]); buffers.add(buffer); @@ -386,7 +372,7 @@ public class BufferPoolTest List<ByteBuffer> buffers = new ArrayList<>(sizes.length); for (int i = 0; i < sizes.length; i++) { - ByteBuffer buffer = BufferPool.get(sizes[i]); + ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffer); assertTrue(buffer.capacity() >= sizes[i]); buffers.add(buffer); @@ -420,7 +406,7 @@ public class BufferPoolTest for (int i = 0; i < numBuffersInChunk; i++) { - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); buffers.add(buffer); addresses.add(MemoryUtil.getAddress(buffer)); } @@ -432,7 +418,7 @@ public class BufferPoolTest for (int i = 0; i < numBuffersInChunk; i++) { - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); addresses.remove(MemoryUtil.getAddress(buffer)); @@ -502,7 +488,7 @@ public class BufferPoolTest private void checkBuffer(int size) { - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); if (size > 0 && size < BufferPool.NORMAL_CHUNK_SIZE) @@ -530,7 +516,7 @@ public class BufferPoolTest for (int size : sizes) { - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); buffers.add(buffer); @@ -549,7 +535,7 @@ public class BufferPoolTest private void checkBufferWithGivenSlots(int size, long freeSlots) { //first allocate to make sure there is a chunk - ByteBuffer buffer = BufferPool.get(size); + ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); // now get the current chunk and override the free slots mask BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); @@ -557,7 +543,7 @@ public class BufferPoolTest long oldFreeSlots = chunk.setFreeSlots(freeSlots); // now check we can still get the buffer with the free slots mask changed - ByteBuffer buffer2 = BufferPool.get(size); + ByteBuffer buffer2 = BufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); BufferPool.put(buffer2); @@ -569,7 +555,7 @@ public class BufferPoolTest @Test public void testZeroSizeRequest() { - ByteBuffer buffer = BufferPool.get(0); + ByteBuffer buffer = BufferPool.get(0, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(0, buffer.capacity()); BufferPool.put(buffer); @@ -578,35 +564,7 @@ public class BufferPoolTest @Test(expected = IllegalArgumentException.class) public void testNegativeSizeRequest() { - BufferPool.get(-1); - } - - @Test - public void testBufferPoolDisabled() - { - BufferPool.DISABLED = true; - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true; - ByteBuffer buffer = BufferPool.get(1024); - assertEquals(0, BufferPool.unsafeNumChunks()); - assertNotNull(buffer); - assertEquals(1024, buffer.capacity()); - assertFalse(buffer.isDirect()); - assertNotNull(buffer.array()); - BufferPool.put(buffer); - assertEquals(0, BufferPool.unsafeNumChunks()); - - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = false; - buffer = BufferPool.get(1024); - assertEquals(0, BufferPool.unsafeNumChunks()); - assertNotNull(buffer); - assertEquals(1024, buffer.capacity()); - assertTrue(buffer.isDirect()); - BufferPool.put(buffer); - assertEquals(0, BufferPool.unsafeNumChunks()); - - // clean-up - BufferPool.DISABLED = false; - BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true; + BufferPool.get(-1, BufferType.OFF_HEAP); } @Test @@ -722,7 +680,7 @@ public class BufferPoolTest for (int j = 0; j < threadSizes.length; j++) { - ByteBuffer buffer = BufferPool.get(threadSizes[j]); + ByteBuffer buffer = BufferPool.get(threadSizes[j], BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(threadSizes[j], buffer.capacity()); @@ -791,7 +749,7 @@ public class BufferPoolTest int sum = 0; for (int i = 0; i < sizes.length; i++) { - buffers[i] = BufferPool.get(sizes[i]); + buffers[i] = BufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffers[i]); assertEquals(sizes[i], buffers[i].capacity()); sum += BufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity()); @@ -852,7 +810,7 @@ public class BufferPoolTest assertTrue(BufferPool.unsafeCurrentChunk().isFree()); //make sure the main thread can still allocate buffers - ByteBuffer buffer = BufferPool.get(sizes[0]); + ByteBuffer buffer = BufferPool.get(sizes[0], BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(sizes[0], buffer.capacity()); BufferPool.put(buffer); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org