This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 702d21d8eba3b54748da5fc295ebc3596e9fdf1f Author: lin chen <[email protected]> AuthorDate: Tue Mar 1 21:16:52 2022 +0800 Support shrink in ConcurrentLongHashMap (#14497) (cherry picked from commit 297941964ed739e35ca68aa46d74410cf112b7bc) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +- .../broker/TransactionMetadataStoreService.java | 5 +- .../apache/pulsar/broker/service/ServerCnx.java | 10 +- .../org/apache/pulsar/client/impl/ClientCnx.java | 22 +++- .../client/impl/TransactionMetaStoreHandler.java | 5 +- .../TransactionCoordinatorClientImpl.java | 6 +- .../util/collections/ConcurrentLongHashMap.java | 139 ++++++++++++++++++--- .../collections/ConcurrentLongHashMapTest.java | 122 +++++++++++++++--- 8 files changed, 274 insertions(+), 42 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b7ff480c674..453e77be0c8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -155,8 +155,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected Map<String, String> propertiesMap; protected final MetaStore store; - final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>( - 16 /* initial capacity */, 1 /* number of sections */); + final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = + ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder() + .expectedItems(16) // initial capacity + .concurrencyLevel(1) // number of sections + .build(); protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>(); private volatile Stat ledgersStat; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 7297c334c4c..cd188397989 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -107,8 +107,9 @@ public class TransactionMetadataStoreService { this.tbClient = tbClient; this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer); this.transactionOpRetryTimer = timer; - this.tcLoadSemaphores = new ConcurrentLongHashMap<>(); - this.pendingConnectRequests = new ConcurrentLongHashMap<>(); + this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build(); + this.pendingConnectRequests = + ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build(); this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 46691273e31..7fa6c9dde8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -243,8 +243,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ServiceConfiguration conf = pulsar.getConfiguration(); // This maps are not heavily contended since most accesses are within the cnx thread - this.producers = new ConcurrentLongHashMap<>(8, 1); - this.consumers = new ConcurrentLongHashMap<>(8, 1); + this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); + this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); this.replicatorPrefix = conf.getReplicatorPrefix(); this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection(); this.proxyRoles = conf.getProxyRoles(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index e7df2944c8c..4cbf98c4fe6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -106,14 +106,28 @@ public class ClientCnx extends PulsarHandler { private State state; private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests = - new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); // LookupRequests that waiting in client side. private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests; - private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1); - private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap<ProducerImpl<?>> producers = + ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = + ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = - new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>(); private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index baec2f94bdf..d14faa227cb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -61,7 +61,10 @@ public class TransactionMetaStoreHandler extends HandlerState private final long transactionCoordinatorId; private final ConnectionHandler connectionHandler; private final ConcurrentLongHashMap<OpBase<?>> pendingRequests = - new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap.<OpBase<?>>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final ConcurrentLinkedQueue<RequestTime> timeoutQueue; protected final Timer timer; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 81390ec4988..432fc671071 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -51,7 +51,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC private final PulsarClientImpl pulsarClient; private TransactionMetaStoreHandler[] handlers; - private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1); + private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = + ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final AtomicLong epoch = new AtomicLong(0); private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER = diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java index 01627c0529d..d8b0c32cd3c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java @@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> { private static final Object EmptyValue = null; private static final Object DeletedValue = new Object(); - private static final float MapFillFactor = 0.66f; - private static final int DefaultExpectedItems = 256; private static final int DefaultConcurrencyLevel = 16; + private static final float DefaultMapFillFactor = 0.66f; + private static final float DefaultMapIdleFactor = 0.15f; + + private static final float DefaultExpandFactor = 2; + private static final float DefaultShrinkFactor = 2; + + private static final boolean DefaultAutoShrink = false; + + public static <V> Builder<V> newBuilder() { + return new Builder<>(); + } + + /** + * Builder of ConcurrentLongHashMap. + */ + public static class Builder<T> { + int expectedItems = DefaultExpectedItems; + int concurrencyLevel = DefaultConcurrencyLevel; + float mapFillFactor = DefaultMapFillFactor; + float mapIdleFactor = DefaultMapIdleFactor; + float expandFactor = DefaultExpandFactor; + float shrinkFactor = DefaultShrinkFactor; + boolean autoShrink = DefaultAutoShrink; + + public Builder<T> expectedItems(int expectedItems) { + this.expectedItems = expectedItems; + return this; + } + + public Builder<T> concurrencyLevel(int concurrencyLevel) { + this.concurrencyLevel = concurrencyLevel; + return this; + } + + public Builder<T> mapFillFactor(float mapFillFactor) { + this.mapFillFactor = mapFillFactor; + return this; + } + + public Builder<T> mapIdleFactor(float mapIdleFactor) { + this.mapIdleFactor = mapIdleFactor; + return this; + } + + public Builder<T> expandFactor(float expandFactor) { + this.expandFactor = expandFactor; + return this; + } + + public Builder<T> shrinkFactor(float shrinkFactor) { + this.shrinkFactor = shrinkFactor; + return this; + } + + public Builder<T> autoShrink(boolean autoShrink) { + this.autoShrink = autoShrink; + return this; + } + + public ConcurrentLongHashMap<T> build() { + return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel, + mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); + } + } + private final Section<V>[] sections; + @Deprecated public ConcurrentLongHashMap() { this(DefaultExpectedItems); } + @Deprecated public ConcurrentLongHashMap(int expectedItems) { this(expectedItems, DefaultConcurrencyLevel); } + @Deprecated public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) { + this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, + DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); + } + + public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel, + float mapFillFactor, float mapIdleFactor, + boolean autoShrink, float expandFactor, float shrinkFactor) { checkArgument(expectedItems > 0); checkArgument(concurrencyLevel > 0); checkArgument(expectedItems >= concurrencyLevel); + checkArgument(mapFillFactor > 0 && mapFillFactor < 1); + checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); + checkArgument(mapFillFactor > mapIdleFactor); + checkArgument(expandFactor > 1); + checkArgument(shrinkFactor > 1); int numSections = concurrencyLevel; int perSectionExpectedItems = expectedItems / numSections; - int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor); + int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor); this.sections = (Section<V>[]) new Section[numSections]; for (int i = 0; i < numSections; i++) { - sections[i] = new Section<>(perSectionCapacity); + sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor, + autoShrink, expandFactor, shrinkFactor); } } @@ -195,20 +274,35 @@ public class ConcurrentLongHashMap<V> { private volatile V[] values; private volatile int capacity; + private final int initCapacity; private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Section.class, "size"); private volatile int size; private int usedBuckets; - private int resizeThreshold; - - Section(int capacity) { + private int resizeThresholdUp; + private int resizeThresholdBelow; + private final float mapFillFactor; + private final float mapIdleFactor; + private final float expandFactor; + private final float shrinkFactor; + private final boolean autoShrink; + + Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, + float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); + this.initCapacity = this.capacity; this.keys = new long[this.capacity]; this.values = (V[]) new Object[this.capacity]; this.size = 0; this.usedBuckets = 0; - this.resizeThreshold = (int) (this.capacity * MapFillFactor); + this.autoShrink = autoShrink; + this.mapFillFactor = mapFillFactor; + this.mapIdleFactor = mapIdleFactor; + this.expandFactor = expandFactor; + this.shrinkFactor = shrinkFactor; + this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); + this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); } V get(long key, int keyHash) { @@ -322,9 +416,10 @@ public class ConcurrentLongHashMap<V> { ++bucket; } } finally { - if (usedBuckets >= resizeThreshold) { + if (usedBuckets > resizeThresholdUp) { try { - rehash(); + int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); + rehash(newCapacity); } finally { unlockWrite(stamp); } @@ -373,7 +468,20 @@ public class ConcurrentLongHashMap<V> { } } finally { - unlockWrite(stamp); + if (autoShrink && size < resizeThresholdBelow) { + try { + int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); + int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); + if (newCapacity < capacity && newResizeThresholdUp > size) { + // shrink the hashmap + rehash(newCapacity); + } + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } } } @@ -385,6 +493,9 @@ public class ConcurrentLongHashMap<V> { Arrays.fill(values, EmptyValue); this.size = 0; this.usedBuckets = 0; + if (autoShrink) { + rehash(initCapacity); + } } finally { unlockWrite(stamp); } @@ -439,9 +550,8 @@ public class ConcurrentLongHashMap<V> { } } - private void rehash() { + private void rehash(int newCapacity) { // Expand the hashmap - int newCapacity = capacity * 2; long[] newKeys = new long[newCapacity]; V[] newValues = (V[]) new Object[newCapacity]; @@ -458,7 +568,8 @@ public class ConcurrentLongHashMap<V> { values = newValues; capacity = newCapacity; usedBuckets = size; - resizeThreshold = (int) (capacity * MapFillFactor); + resizeThresholdUp = (int) (capacity * mapFillFactor); + resizeThresholdBelow = (int) (capacity * mapIdleFactor); } private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java index 14d8395ae8c..6cf126cf2ff 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java @@ -48,21 +48,29 @@ public class ConcurrentLongHashMapTest { @Test public void testConstructor() { try { - new ConcurrentLongHashMap<String>(0); + ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentLongHashMap<String>(16, 0); + ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(16) + .concurrencyLevel(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentLongHashMap<String>(4, 8); + ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(4) + .concurrencyLevel(8) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok @@ -71,7 +79,9 @@ public class ConcurrentLongHashMapTest { @Test public void simpleInsertions() { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(16) + .build(); assertTrue(map.isEmpty()); assertNull(map.put(1, "one")); @@ -97,9 +107,64 @@ public class ConcurrentLongHashMapTest { assertEquals(map.size(), 3); } + @Test + public void testClear() { + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertNull(map.put(1, "v1")); + assertNull(map.put(2, "v2")); + assertNull(map.put(3, "v3")); + + assertTrue(map.capacity() == 8); + map.clear(); + assertTrue(map.capacity() == 4); + } + + @Test + public void testExpandAndShrink() { + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertNull(map.put(1, "v1")); + assertNull(map.put(2, "v2")); + assertNull(map.put(3, "v3")); + + // expand hashmap + assertTrue(map.capacity() == 8); + + assertTrue(map.remove(1, "v1")); + // not shrink + assertTrue(map.capacity() == 8); + assertTrue(map.remove(2, "v2")); + // shrink hashmap + assertTrue(map.capacity() == 4); + + // expand hashmap + assertNull(map.put(4, "v4")); + assertNull(map.put(5, "v5")); + assertTrue(map.capacity() == 8); + + //verify that the map does not keep shrinking at every remove() operation + assertNull(map.put(6, "v6")); + assertTrue(map.remove(6, "v6")); + assertTrue(map.capacity() == 8); + } + @Test public void testRemove() { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .build(); assertTrue(map.isEmpty()); assertNull(map.put(1, "one")); @@ -115,7 +180,10 @@ public class ConcurrentLongHashMapTest { @Test public void testNegativeUsedBucketCount() { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); map.put(0, "zero"); assertEquals(1, map.getUsedBucketCount()); @@ -130,7 +198,10 @@ public class ConcurrentLongHashMapTest { @Test public void testRehashing() { int n = 16; - ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1); + ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(map.capacity(), n); assertEquals(map.size(), 0); @@ -145,7 +216,10 @@ public class ConcurrentLongHashMapTest { @Test public void testRehashingWithDeletes() { int n = 16; - ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1); + ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(map.capacity(), n); assertEquals(map.size(), 0); @@ -167,7 +241,8 @@ public class ConcurrentLongHashMapTest { @Test public void concurrentInsertions() throws Throwable { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -201,7 +276,8 @@ public class ConcurrentLongHashMapTest { @Test public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -235,7 +311,10 @@ public class ConcurrentLongHashMapTest { @Test public void stressConcurrentInsertionsAndReads() throws Throwable { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(4) + .concurrencyLevel(1) + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int writeThreads = 16; @@ -286,7 +365,8 @@ public class ConcurrentLongHashMapTest { @Test public void testIteration() { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .build(); assertEquals(map.keys(), Collections.emptyList()); assertEquals(map.values(), Collections.emptyList()); @@ -330,7 +410,10 @@ public class ConcurrentLongHashMapTest { @Test public void testHashConflictWithDeletion() { final int Buckets = 16; - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(Buckets) + .concurrencyLevel(1) + .build(); // Pick 2 keys that fall into the same bucket long key1 = 1; @@ -363,7 +446,8 @@ public class ConcurrentLongHashMapTest { @Test public void testPutIfAbsent() { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .build(); assertNull(map.putIfAbsent(1, "one")); assertEquals(map.get(1), "one"); @@ -373,7 +457,10 @@ public class ConcurrentLongHashMapTest { @Test public void testComputeIfAbsent() { - ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); AtomicInteger counter = new AtomicInteger(); LongFunction<Integer> provider = key -> counter.getAndIncrement(); @@ -395,7 +482,10 @@ public class ConcurrentLongHashMapTest { static final int N = 100_000; public void benchConcurrentLongHashMap() throws Exception { - ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1); + ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder() + .expectedItems(N) + .concurrencyLevel(1) + .build(); for (long i = 0; i < Iterations; i++) { for (int j = 0; j < N; j++) {
