This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3ebc23e4bff05af9c79a41bf3f9cd17f777f6c6f Author: LinChen <[email protected]> AuthorDate: Tue Mar 15 11:41:08 2022 +0800 Reduce unnecessary expansions for ConcurrentLong map and set (#14562) (cherry picked from commit 8e7006f899bd2b9ed9482ab2ce1ee35233957d03) --- .../util/collections/ConcurrentLongHashMap.java | 10 ++++++ .../util/collections/ConcurrentLongPairSet.java | 11 ++++--- .../util/collections/ConcurrentOpenHashMap.java | 19 ++++++++++++ .../util/collections/ConcurrentOpenHashSet.java | 18 +++++++++++ .../collections/ConcurrentLongHashMapTest.java | 19 ++++++++++++ .../collections/ConcurrentLongPairSetTest.java | 19 ++++++++++++ .../collections/ConcurrentOpenHashMapTest.java | 19 ++++++++++++ .../collections/ConcurrentOpenHashSetTest.java | 36 +++++++++++++++++----- 8 files changed, 138 insertions(+), 13 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java index d8b0c32cd3c..90aa61a6d9b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java @@ -451,6 +451,16 @@ public class ConcurrentLongHashMap<V> { if (nextValueInArray == EmptyValue) { values[bucket] = (V) EmptyValue; --usedBuckets; + + // Cleanup all the buckets that were in `DeletedValue` state, + // so that we can reduce unnecessary expansions + int lastBucket = signSafeMod(bucket - 1, capacity); + while (values[lastBucket] == DeletedValue) { + values[lastBucket] = (V) EmptyValue; + --usedBuckets; + + lastBucket = signSafeMod(lastBucket - 1, capacity); + } } else { values[bucket] = (V) DeletedValue; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java index abbe11576a9..66ecaee4bfa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java @@ -493,14 +493,15 @@ public class ConcurrentLongPairSet implements LongPairSet { table[bucket + 1] = EmptyItem; --usedBuckets; - // Cleanup all the buckets that were in `DeletedKey` state, + // Cleanup all the buckets that were in `DeletedItem` state, // so that we can reduce unnecessary expansions - bucket = (bucket - 1) & (table.length - 1); - while (table[bucket] == DeletedItem) { - table[bucket] = EmptyItem; + int lastBucket = (bucket - 2) & (table.length - 1); + while (table[lastBucket] == DeletedItem) { + table[lastBucket] = EmptyItem; + table[lastBucket + 1] = EmptyItem; --usedBuckets; - bucket = (bucket - 1) & (table.length - 1); + lastBucket = (lastBucket - 2) & (table.length - 1); } } else { table[bucket] = DeletedItem; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java index 1ccbeb3b6b5..e039079eeb3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java @@ -173,6 +173,14 @@ public class ConcurrentOpenHashMap<K, V> { } } + long getUsedBucketCount() { + long usedBucketCount = 0; + for (Section<K, V> s : sections) { + usedBucketCount += s.usedBuckets; + } + return usedBucketCount; + } + public long size() { long size = 0; for (Section<K, V> s : sections) { @@ -441,6 +449,17 @@ public class ConcurrentOpenHashMap<K, V> { table[bucket] = EmptyKey; table[bucket + 1] = null; --usedBuckets; + + // Cleanup all the buckets that were in `DeletedKey` state, + // so that we can reduce unnecessary expansions + int lastBucket = (bucket - 2) & (table.length - 1); + while (table[lastBucket] == DeletedKey) { + table[lastBucket] = EmptyKey; + table[lastBucket + 1] = null; + --usedBuckets; + + lastBucket = (lastBucket - 2) & (table.length - 1); + } } else { table[bucket] = DeletedKey; table[bucket + 1] = null; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java index 28f0df0ff20..6dd6e6a4b63 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java @@ -152,6 +152,14 @@ public class ConcurrentOpenHashSet<V> { } } + long getUsedBucketCount() { + long usedBucketCount = 0; + for (Section<V> s : sections) { + usedBucketCount += s.usedBuckets; + } + return usedBucketCount; + } + public long size() { long size = 0; for (int i = 0; i < sections.length; i++) { @@ -477,6 +485,16 @@ public class ConcurrentOpenHashSet<V> { if (values[nextInArray] == EmptyValue) { values[bucket] = (V) EmptyValue; --usedBuckets; + + // Cleanup all the buckets that were in `DeletedValue` state, + // so that we can reduce unnecessary expansions + int lastBucket = signSafeMod(bucket - 1, capacity); + while (values[lastBucket] == DeletedValue) { + values[lastBucket] = (V) EmptyValue; + --usedBuckets; + + lastBucket = signSafeMod(lastBucket - 1, capacity); + } } else { values[bucket] = (V) DeletedValue; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java index 6cf126cf2ff..205cf91b47d 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java @@ -107,6 +107,25 @@ public class ConcurrentLongHashMapTest { assertEquals(map.size(), 3); } + @Test + public void testReduceUnnecessaryExpansions() { + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .build(); + assertNull(map.put(1, "v1")); + assertNull(map.put(2, "v2")); + assertNull(map.put(3, "v3")); + assertNull(map.put(4, "v4")); + + assertTrue(map.remove(1, "v1")); + assertTrue(map.remove(2, "v2")); + assertTrue(map.remove(3, "v3")); + assertTrue(map.remove(4, "v4")); + + assertEquals(0, map.getUsedBucketCount()); + } + @Test public void testClear() { ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java index a8d3e1d0603..86030f21619 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java @@ -74,6 +74,25 @@ public class ConcurrentLongPairSetTest { } } + @Test + public void testReduceUnnecessaryExpansions() { + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .build(); + assertTrue(set.add(1, 1)); + assertTrue(set.add(2, 2)); + assertTrue(set.add(3, 3)); + assertTrue(set.add(4, 4)); + + assertTrue(set.remove(1, 1)); + assertTrue(set.remove(2, 2)); + assertTrue(set.remove(3, 3)); + assertTrue(set.remove(4, 4)); + + assertEquals(0, set.getUsedBucketCount()); + } + @Test public void simpleInsertions() { ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java index 7919485d9b6..cec52ea3ded 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java @@ -109,6 +109,25 @@ public class ConcurrentOpenHashMapTest { assertEquals(map.size(), 3); } + @Test + public void testReduceUnnecessaryExpansions() { + ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .build(); + assertNull(map.put("1", "1")); + assertNull(map.put("2", "2")); + assertNull(map.put("3", "3")); + assertNull(map.put("4", "4")); + + assertEquals(map.remove("1"), "1"); + assertEquals(map.remove("2"), "2"); + assertEquals(map.remove("3"), "3"); + assertEquals(map.remove("4"), "4"); + + assertEquals(0, map.getUsedBucketCount()); + } + @Test public void testClear() { ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder() diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java index af62948b64a..6c82293bec2 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java @@ -91,24 +91,44 @@ public class ConcurrentOpenHashSetTest { assertEquals(set.size(), 3); } + @Test + public void testReduceUnnecessaryExpansions() { + ConcurrentOpenHashSet<String> set = + ConcurrentOpenHashSet.<String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .build(); + + assertTrue(set.add("1")); + assertTrue(set.add("2")); + assertTrue(set.add("3")); + assertTrue(set.add("4")); + + assertTrue(set.remove("1")); + assertTrue(set.remove("2")); + assertTrue(set.remove("3")); + assertTrue(set.remove("4")); + assertEquals(0, set.getUsedBucketCount()); + } + @Test public void testClear() { - ConcurrentOpenHashSet<String> map = + ConcurrentOpenHashSet<String> set = ConcurrentOpenHashSet.<String>newBuilder() .expectedItems(2) .concurrencyLevel(1) .autoShrink(true) .mapIdleFactor(0.25f) .build(); - assertTrue(map.capacity() == 4); + assertTrue(set.capacity() == 4); - assertTrue(map.add("k1")); - assertTrue(map.add("k2")); - assertTrue(map.add("k3")); + assertTrue(set.add("k1")); + assertTrue(set.add("k2")); + assertTrue(set.add("k3")); - assertTrue(map.capacity() == 8); - map.clear(); - assertTrue(map.capacity() == 4); + assertTrue(set.capacity() == 8); + set.clear(); + assertTrue(set.capacity() == 4); } @Test
