This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ab1d708e6bb7df0a71458f3d1e64105aceba3dee Author: feynmanlin <[email protected]> AuthorDate: Tue Jun 7 08:52:40 2022 +0800 Support shrink for TripleLongPriorityQueue (#15936) * Support shrinkage in TripleLongPriorityQueue * Add unit test * Remove unused code * style * Address comments --- .../util/collections/TripleLongPriorityQueue.java | 59 ++++++++++++++++++++-- .../collections/TripleLongPriorityQueueTest.java | 35 +++++++++++++ 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java index 1d8d909beae..487c2284cff 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.util.collections; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -31,16 +32,30 @@ public class TripleLongPriorityQueue implements AutoCloseable { private static final int SIZE_OF_LONG = 8; private static final int DEFAULT_INITIAL_CAPACITY = 16; + private static final float DEFAULT_SHRINK_FACTOR = 0.5f; // Each item is composed of 3 longs private static final int ITEMS_COUNT = 3; private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG; - private final ByteBuf buffer; + /** + * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage. + */ + private static final float RESERVATION_FACTOR = 0.9f; + + private ByteBuf buffer; + + private final int initialCapacity; private int capacity; private int size; + /** + * When size < capacity * shrinkFactor, may trigger shrinking. + */ + private final float shrinkFactor; + + private float shrinkThreshold; /** * Create a new priority queue with default initial capacity. @@ -49,14 +64,22 @@ public class TripleLongPriorityQueue implements AutoCloseable { this(DEFAULT_INITIAL_CAPACITY); } + public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) { + checkArgument(shrinkFactor > 0); + this.initialCapacity = initialCapacity; + this.capacity = initialCapacity; + this.shrinkThreshold = this.capacity * shrinkFactor; + this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE); + this.size = 0; + this.shrinkFactor = shrinkFactor; + } + /** * Create a new priority queue with a given initial capacity. * @param initialCapacity */ public TripleLongPriorityQueue(int initialCapacity) { - capacity = initialCapacity; - buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG); - size = 0; + this(initialCapacity, DEFAULT_SHRINK_FACTOR); } /** @@ -122,6 +145,7 @@ public class TripleLongPriorityQueue implements AutoCloseable { swap(0, size - 1); size--; siftDown(0); + shrinkCapacity(); } /** @@ -144,14 +168,36 @@ public class TripleLongPriorityQueue implements AutoCloseable { public void clear() { this.buffer.clear(); this.size = 0; + shrinkCapacity(); } private void increaseCapacity() { // For bigger sizes, increase by 50% this.capacity += (capacity <= 256 ? capacity : capacity / 2); + this.shrinkThreshold = this.capacity * shrinkFactor; buffer.capacity(this.capacity * TUPLE_SIZE); } + private void shrinkCapacity() { + if (capacity > initialCapacity && size < shrinkThreshold) { + int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR); + if (decreasingSize <= 0) { + return; + } + if (capacity - decreasingSize <= initialCapacity) { + this.capacity = initialCapacity; + } else { + this.capacity = capacity - decreasingSize; + } + this.shrinkThreshold = this.capacity * shrinkFactor; + + ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE); + buffer.getBytes(0, newBuffer, size * TUPLE_SIZE); + buffer.release(); + this.buffer = newBuffer; + } + } + private void siftUp(int idx) { while (idx > 0) { int parentIdx = (idx - 1) / 2; @@ -229,4 +275,9 @@ public class TripleLongPriorityQueue implements AutoCloseable { buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2); buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3); } + + @VisibleForTesting + ByteBuf getBuffer() { + return buffer; + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java index 4cb1027e0a9..bd3aef86ad1 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java @@ -135,4 +135,39 @@ public class TripleLongPriorityQueueTest { pq.close(); } + + @Test + public void testShrink() throws Exception { + int initialCapacity = 20; + int tupleSize = 3 * 8; + TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f); + pq.add(0, 0, 0); + assertEquals(pq.size(), 1); + assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize); + + // Scale out to capacity * 2 + triggerScaleOut(initialCapacity, pq); + int scaleCapacity = initialCapacity * 2; + assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize); + // Trigger shrinking + for (int i = 0; i < initialCapacity / 2 + 1; i++) { + pq.pop(); + } + int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f); + assertEquals(pq.getBuffer().capacity(), capacity * tupleSize); + // Scale out to capacity * 2 + triggerScaleOut(initialCapacity, pq); + scaleCapacity = capacity * 2; + assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize); + // Trigger shrinking + pq.clear(); + capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f); + assertEquals(pq.getBuffer().capacity(), capacity * tupleSize); + } + + private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) { + for (long i = 0; i < initialCapacity + 1; i++) { + pq.add(i, i, i); + } + } }
