This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f7128614c1dc8f2e6a51c0644353197792297528 Author: Jiwei Guo <[email protected]> AuthorDate: Thu Apr 28 20:30:30 2022 +0800 [improve][broker] Support shrink for ConcurrentSortedLongPairSet (#15354) (cherry picked from commit 24d4d76bb9e39010bae3f4cbd8ddba6422570b4e) --- .../persistent/MessageRedeliveryController.java | 2 +- .../util/collections/ConcurrentLongPairSet.java | 53 ++++++++++++---------- .../collections/ConcurrentSortedLongPairSet.java | 27 +++++++++-- .../common/util/collections/LongPairSet.java | 7 +++ .../ConcurrentSortedLongPairSetTest.java | 43 ++++++++++++++++++ 5 files changed, 105 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index c7f96fffcef..46fa1b2b050 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -36,7 +36,7 @@ public class MessageRedeliveryController { private final ConcurrentLongLongPairHashMap hashesToBeBlocked; public MessageRedeliveryController(boolean allowOutOfOrderDelivery) { - this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true); this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : ConcurrentLongLongPairHashMap diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java index 66ecaee4bfa..7b5e75813fa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java @@ -175,6 +175,7 @@ public class ConcurrentLongPairSet implements LongPairSet { return size; } + @Override public long capacity() { long capacity = 0; for (int i = 0; i < sections.length; i++) { @@ -447,20 +448,7 @@ public class ConcurrentLongPairSet implements LongPairSet { bucket = (bucket + 2) & (table.length - 1); } } finally { - if (autoShrink && size < resizeThresholdBelow) { - try { - int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); - int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); - if (newCapacity < capacity && newResizeThresholdUp > size) { - // shrink the hashmap - rehash(newCapacity); - } - } finally { - unlockWrite(stamp); - } - } else { - unlockWrite(stamp); - } + tryShrinkThenUnlock(stamp); } } @@ -469,23 +457,42 @@ public class ConcurrentLongPairSet implements LongPairSet { int removedItems = 0; // Go through all the buckets for this section - for (int bucket = 0; bucket < table.length; bucket += 2) { - long storedItem1 = table[bucket]; - long storedItem2 = table[bucket + 1]; - - if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) { - if (filter.test(storedItem1, storedItem2)) { - long h = hash(storedItem1, storedItem2); - if (remove(storedItem1, storedItem2, (int) h)) { + long stamp = writeLock(); + try { + for (int bucket = 0; bucket < table.length; bucket += 2) { + long storedItem1 = table[bucket]; + long storedItem2 = table[bucket + 1]; + if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) { + if (filter.test(storedItem1, storedItem2)) { + SIZE_UPDATER.decrementAndGet(this); + cleanBucket(bucket); removedItems++; } } } + } finally { + tryShrinkThenUnlock(stamp); } - return removedItems; } + private void tryShrinkThenUnlock(long stamp) { + if (autoShrink && size < resizeThresholdBelow) { + try { + int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); + int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); + if (newCapacity < capacity && newResizeThresholdUp > size) { + // shrink the hashmap + rehash(newCapacity); + } + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } + } + private void cleanBucket(int bucket) { int nextInArray = (bucket + 2) & (table.length - 1); if (table[nextInArray] == EmptyItem) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java index e4cb668fc92..06efd0490d1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java @@ -48,14 +48,15 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairC public class ConcurrentSortedLongPairSet implements LongPairSet { protected final NavigableMap<Long, ConcurrentLongPairSet> longPairSets = new ConcurrentSkipListMap<>(); - private int expectedItems; - private int concurrencyLevel; + private final int expectedItems; + private final int concurrencyLevel; /** * If {@link #longPairSets} adds and removes the item-set frequently then it allocates and removes * {@link ConcurrentLongPairSet} for the same item multiple times which can lead to gc-puases. To avoid such * situation, avoid removing empty LogPairSet until it reaches max limit. */ - private int maxAllowedSetOnRemove; + private final int maxAllowedSetOnRemove; + private final boolean autoShrink; private static final int DEFAULT_MAX_ALLOWED_SET_ON_REMOVE = 10; public ConcurrentSortedLongPairSet() { @@ -70,10 +71,20 @@ public class ConcurrentSortedLongPairSet implements LongPairSet { this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE); } + public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, boolean autoShrink) { + this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE, autoShrink); + } + public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove) { + this(expectedItems, concurrencyLevel, maxAllowedSetOnRemove, false); + } + + public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove, + boolean autoShrink) { this.expectedItems = expectedItems; this.concurrencyLevel = concurrencyLevel; this.maxAllowedSetOnRemove = maxAllowedSetOnRemove; + this.autoShrink = autoShrink; } @Override @@ -82,6 +93,7 @@ public class ConcurrentSortedLongPairSet implements LongPairSet { (key) -> ConcurrentLongPairSet.newBuilder() .expectedItems(expectedItems) .concurrencyLevel(concurrencyLevel) + .autoShrink(autoShrink) .build()); return messagesToReplay.add(item1, item2); } @@ -194,6 +206,15 @@ public class ConcurrentSortedLongPairSet implements LongPairSet { return size.get(); } + @Override + public long capacity() { + AtomicLong capacity = new AtomicLong(0); + longPairSets.forEach((item1, longPairSet) -> { + capacity.getAndAdd(longPairSet.capacity()); + }); + return capacity.get(); + } + @Override public boolean contains(long item1, long item2) { ConcurrentLongPairSet longPairSet = longPairSets.get(item1); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java index 32de7e4c232..f27b994f777 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java @@ -107,6 +107,13 @@ public interface LongPairSet { */ long size(); + /** + * Returns capacity of the set. + * + * @return + */ + long capacity(); + /** * Checks if given (item1,item2) composite value exists into set. * diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java index fcb9884a795..62dfa21dc81 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; @@ -181,6 +182,20 @@ public class ConcurrentSortedLongPairSetTest { values = new ArrayList<>(set.items()); values.sort(null); assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new LongPair(7, 7))); + + set = new ConcurrentSortedLongPairSet(128, 2, true); + set.add(2, 2); + set.add(1, 3); + set.add(3, 1); + set.add(2, 1); + set.add(3, 2); + set.add(1, 2); + set.add(1, 1); + removeItems = set.removeIf((ledgerId, entryId) -> { + return ComparisonChain.start().compare(ledgerId, 1).compare(entryId, 3) + .result() <= 0; + }); + assertEquals(removeItems, 3); } @Test @@ -245,4 +260,32 @@ public class ConcurrentSortedLongPairSetTest { set.add(1, 1); assertFalse(set.isEmpty()); } + + @Test + public void testShrink() { + LongPairSet set = new ConcurrentSortedLongPairSet(2, 1, true); + set.add(0, 0); + assertTrue(set.capacity() == 4); + set.add(0, 1); + assertTrue(set.capacity() == 4); + set.add(1, 1); + assertTrue(set.capacity() == 8); + set.add(1, 2); + assertTrue(set.capacity() == 8); + set.add(1, 3); + set.add(1, 4); + set.add(1, 5); + assertTrue(set.capacity() == 12); + set.remove(1, 5); + // not shrink + assertTrue(set.capacity() == 12); + set.remove(1, 4); + // the internal map does not keep shrinking at every remove() operation + assertTrue(set.capacity() == 12); + set.remove(1, 3); + set.remove(1, 2); + set.remove(1, 1); + // shrink + assertTrue(set.capacity() == 8); + } }
