This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 74b00a1  support shrink for ConcurrentLong map or set (#3074)
74b00a1 is described below

commit 74b00a101cb6ffd06dafccfe9d31c8c5be66efaf
Author: lin chen <[email protected]>
AuthorDate: Fri Mar 4 01:42:10 2022 +0800

    support shrink for ConcurrentLong map or set (#3074)
    
    * support shrink for ConcurrentLong map or set
    
    * fix unit test
    
    * check style
    
    * add shrink unit test.
    
    * fix unit test
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   |   3 +-
 .../EntryLogManagerForEntryLogPerLedger.java       |   3 +-
 .../apache/bookkeeper/bookie/EntryLogMetadata.java |   5 +-
 .../bookkeeper/bookie/FileInfoBackingCache.java    |   3 +-
 .../bookkeeper/bookie/HandleFactoryImpl.java       |   4 +-
 .../bookie/storage/ldb/EntryLocationIndex.java     |   2 +-
 .../bookie/storage/ldb/LedgerMetadataIndex.java    |   2 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   6 +-
 .../bookkeeper/bookie/storage/ldb/WriteCache.java  |   8 +-
 .../bookkeeper/proto/PerChannelBookieClient.java   |   2 +-
 .../util/collections/ConcurrentLongHashMap.java    | 156 ++++++++++++++++--
 .../util/collections/ConcurrentLongHashSet.java    | 137 ++++++++++++++--
 .../collections/ConcurrentLongLongHashMap.java     | 182 ++++++++++++++++++---
 .../util/collections/ConcurrentOpenHashMap.java    | 155 ++++++++++++++++--
 .../util/collections/ConcurrentOpenHashSet.java    | 140 ++++++++++++++--
 .../collections/ConcurrentLongHashMapTest.java     | 125 ++++++++++++--
 .../collections/ConcurrentLongHashSetTest.java     |  87 ++++++++--
 .../collections/ConcurrentLongLongHashMapTest.java | 130 ++++++++++++---
 .../collections/ConcurrentOpenHashMapTest.java     | 116 +++++++++++--
 .../collections/ConcurrentOpenHashSetTest.java     | 102 ++++++++++--
 .../metadata/etcd/EtcdLedgerManager.java           |   2 +-
 .../bookkeeper/metadata/etcd/EtcdWatchClient.java  |   4 +-
 .../impl/routing/RangeRoutingTableImpl.java        |   5 +-
 .../storage/impl/sc/ZkStorageContainerManager.java |   2 +-
 .../bookie/ReadLogMetadataCommandTest.java         |   2 +-
 25 files changed, 1212 insertions(+), 171 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index ff0d15d..b382c4d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -111,7 +111,8 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
 
     private int exitCode = ExitCode.OK;
 
-    private final ConcurrentLongHashMap<byte[]> masterKeyCache = new 
ConcurrentLongHashMap<>();
+    private final ConcurrentLongHashMap<byte[]> masterKeyCache =
+            ConcurrentLongHashMap.<byte[]>newBuilder().build();
 
     protected StateManager stateManager;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index 5e123dc..c04ccbe 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -275,7 +275,8 @@ class EntryLogManagerForEntryLogPerLedger extends 
EntryLogManagerBase {
         super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
         this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
         this.rotatedLogChannels = new 
CopyOnWriteArrayList<BufferedLogChannel>();
-        this.replicaOfCurrentLogChannels = new 
ConcurrentLongHashMap<BufferedLogChannelWithDirInfo>();
+        this.replicaOfCurrentLogChannels =
+                
ConcurrentLongHashMap.<BufferedLogChannelWithDirInfo>newBuilder().build();
         this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
         this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
         this.entryLogPerLedgerCounterLimitsMultFactor = 
conf.getEntryLogPerLedgerCounterLimitsMultFactor();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
index 6dab68c..0445033 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
@@ -44,7 +44,10 @@ public class EntryLogMetadata {
     private static final short DEFAULT_SERIALIZATION_VERSION = 0;
 
     protected EntryLogMetadata() {
-        ledgersMap = new ConcurrentLongLongHashMap(256, 1);
+        ledgersMap = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(256)
+                .concurrencyLevel(1)
+                .build();
     }
 
     public EntryLogMetadata(long logId) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 078292f..887cd70 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -33,7 +33,8 @@ class FileInfoBackingCache {
     static final int DEAD_REF = -0xdead;
 
     final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    final ConcurrentLongHashMap<CachedFileInfo> fileInfos = new 
ConcurrentLongHashMap<>();
+    final ConcurrentLongHashMap<CachedFileInfo> fileInfos =
+            ConcurrentLongHashMap.<CachedFileInfo>newBuilder().build();
     final FileLoader fileLoader;
     final int fileInfoVersionToWrite;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
index 2bc72e2..ce03b59 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
@@ -34,8 +34,8 @@ class HandleFactoryImpl implements HandleFactory, 
LedgerDeletionListener {
 
     HandleFactoryImpl(LedgerStorage ledgerStorage) {
         this.ledgerStorage = ledgerStorage;
-        this.ledgers = new ConcurrentLongHashMap<>();
-        this.readOnlyLedgers = new ConcurrentLongHashMap<>();
+        this.ledgers = 
ConcurrentLongHashMap.<LedgerDescriptor>newBuilder().build();
+        this.readOnlyLedgers = 
ConcurrentLongHashMap.<LedgerDescriptor>newBuilder().build();
 
         ledgerStorage.registerLedgerDeletionListener(this);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index 6b7a20d..792d96d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 public class EntryLocationIndex implements Closeable {
 
     private final KeyValueStorage locationsDb;
-    private final ConcurrentLongHashSet deletedLedgers = new 
ConcurrentLongHashSet();
+    private final ConcurrentLongHashSet deletedLedgers = 
ConcurrentLongHashSet.newBuilder().build();
 
     private final EntryLocationIndexStats stats;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
index eb7548c..df79c1b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -72,7 +72,7 @@ public class LedgerMetadataIndex implements Closeable {
             StatsLogger stats) throws IOException {
         ledgersDb = storageFactory.newKeyValueStorage(basePath, "ledgers", 
DbConfigType.Small, conf);
 
-        ledgers = new ConcurrentLongHashMap<>();
+        ledgers = ConcurrentLongHashMap.<LedgerData>newBuilder().build();
         ledgersCount = new AtomicInteger();
 
         // Read all ledgers from db
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index ce8af36..d13962f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -176,8 +176,10 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
         entryLocationIndex = new EntryLocationIndex(conf,
                 KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger);
 
-        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
-                Runtime.getRuntime().availableProcessors() * 2);
+        transientLedgerInfoCache = 
ConcurrentLongHashMap.<TransientLedgerInfo>newBuilder()
+                .expectedItems(16 * 1024)
+                .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 
2)
+                .build();
         
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo,
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, 
TimeUnit.MINUTES);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index fedb61a..659162e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -65,8 +65,10 @@ public class WriteCache implements Closeable {
             .concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
             .build();
 
-    private final ConcurrentLongLongHashMap lastEntryMap =
-            new ConcurrentLongLongHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
+    private final ConcurrentLongLongHashMap lastEntryMap = 
ConcurrentLongLongHashMap.newBuilder()
+            .expectedItems(4096)
+            .concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
+            .build();
 
     private final ByteBuf[] cacheSegments;
     private final int segmentsCount;
@@ -80,7 +82,7 @@ public class WriteCache implements Closeable {
     private final AtomicLong cacheOffset = new AtomicLong(0);
     private final LongAdder cacheCount = new LongAdder();
 
-    private final ConcurrentLongHashSet deletedLedgers = new 
ConcurrentLongHashSet();
+    private final ConcurrentLongHashSet deletedLedgers = 
ConcurrentLongHashSet.newBuilder().build();
 
     private final ByteBufAllocator allocator;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 19ad306..ab054a5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -181,7 +181,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     final int startTLSTimeout;
 
     private final ConcurrentOpenHashMap<CompletionKey, CompletionValue> 
completionObjects =
-        new ConcurrentOpenHashMap<CompletionKey, CompletionValue>();
+            ConcurrentOpenHashMap.<CompletionKey, 
CompletionValue>newBuilder().build();
 
     // Map that hold duplicated read requests. The idea is to only use this 
map (synchronized) when there is a duplicate
     // read request for the same ledgerId/entryId
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index 8993b57..d7ee024 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -47,11 +47,76 @@ public class ConcurrentLongHashMap<V> {
     private static final Object EmptyValue = null;
     private static final Object DeletedValue = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
+
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentLongHashMap.
+     */
+    public static class Builder<T> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<T> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<T> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<T> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<T> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<T> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<T> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<T> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongHashMap<T> build() {
+            return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, 
shrinkFactor);
+        }
+    }
+
+
     /**
      * Predicate specialization for (long, V) types.
      *
@@ -63,26 +128,42 @@ public class ConcurrentLongHashMap<V> {
 
     private final Section<V>[] sections;
 
+    @Deprecated
     public ConcurrentLongHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, 
DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float 
shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / 
MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / 
mapFillFactor);
         this.sections = (Section<V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, 
mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -219,17 +300,32 @@ public class ConcurrentLongHashMap<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private final int initCapacity;
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.keys = new long[this.capacity];
             this.values = (V[]) new Object[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         V get(long key, int keyHash) {
@@ -343,9 +439,10 @@ public class ConcurrentLongHashMap<V> {
                     ++bucket;
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -408,7 +505,20 @@ public class ConcurrentLongHashMap<V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -444,7 +554,20 @@ public class ConcurrentLongHashMap<V> {
 
                 return removedCount;
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -456,6 +579,9 @@ public class ConcurrentLongHashMap<V> {
                 Arrays.fill(values, EmptyValue);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -508,9 +634,8 @@ public class ConcurrentLongHashMap<V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newKeys = new long[newCapacity];
             V[] newValues = (V[]) new Object[newCapacity];
 
@@ -529,7 +654,8 @@ public class ConcurrentLongHashMap<V> {
             // Capacity needs to be updated after the values, so that we won't 
see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <V> void insertKeyValueNoLock(long[] keys, V[] values, 
long key, V value) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
index acb5573..5627f8b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -45,8 +45,74 @@ public class ConcurrentLongHashSet {
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section[] sections;
 
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongHashSet.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongHashSet build() {
+            return new ConcurrentLongHashSet(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, 
shrinkFactor);
+        }
+    }
+
+
     /**
      * A consumer of long values.
      */
@@ -54,18 +120,33 @@ public class ConcurrentLongHashSet {
         void accept(long item);
     }
 
+    @Deprecated
     public ConcurrentLongHashSet() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongHashSet(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongHashSet(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, 
DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongHashSet(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float 
shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
@@ -73,7 +154,8 @@ public class ConcurrentLongHashSet {
         this.sections = new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section(perSectionCapacity);
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, 
mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -169,16 +251,31 @@ public class ConcurrentLongHashSet {
         private volatile long[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new long[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
             Arrays.fill(table, EmptyItem);
         }
 
@@ -263,9 +360,11 @@ public class ConcurrentLongHashSet {
                     bucket = (bucket + 1) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -296,7 +395,20 @@ public class ConcurrentLongHashSet {
                     bucket = (bucket + 1) & (table.length - 1);
                 }
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -327,6 +439,9 @@ public class ConcurrentLongHashSet {
                 Arrays.fill(table, EmptyItem);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -371,9 +486,8 @@ public class ConcurrentLongHashSet {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newTable = new long[newCapacity];
             Arrays.fill(newTable, EmptyItem);
 
@@ -390,7 +504,8 @@ public class ConcurrentLongHashSet {
             // Capacity needs to be updated after the values, so that we won't 
see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * SetFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static void insertKeyValueNoLock(long[] table, int capacity, 
long item) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index c5d1c7b..2e773b2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -47,13 +47,76 @@ public class ConcurrentLongLongHashMap {
 
     private static final long ValueNotFound = -1L;
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section[] sections;
 
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongLongHashMap.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongLongHashMap build() {
+            return new ConcurrentLongLongHashMap(expectedItems, 
concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, 
shrinkFactor);
+        }
+    }
+
     /**
      * A Long-Long BiConsumer.
      */
@@ -75,26 +138,42 @@ public class ConcurrentLongLongHashMap {
         boolean test(long key, long value);
     }
 
+    @Deprecated
     public ConcurrentLongLongHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongLongHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongLongHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, 
DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongLongHashMap(int expectedItems, int concurrencyLevel,
+                                     float mapFillFactor, float mapIdleFactor,
+                                     boolean autoShrink, float expandFactor, 
float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / 
MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / 
mapFillFactor);
         this.sections = new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section(perSectionCapacity);
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, 
mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -288,16 +367,31 @@ public class ConcurrentLongLongHashMap {
         private volatile long[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new long[2 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
             Arrays.fill(table, EmptyKey);
         }
 
@@ -395,9 +489,11 @@ public class ConcurrentLongLongHashMap {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -451,9 +547,11 @@ public class ConcurrentLongLongHashMap {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -509,9 +607,11 @@ public class ConcurrentLongLongHashMap {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -547,7 +647,20 @@ public class ConcurrentLongLongHashMap {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -572,7 +685,20 @@ public class ConcurrentLongLongHashMap {
 
                 return removedCount;
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -598,7 +724,20 @@ public class ConcurrentLongLongHashMap {
 
                 return removedCount;
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -631,6 +770,9 @@ public class ConcurrentLongLongHashMap {
                 Arrays.fill(table, EmptyKey);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -677,9 +819,8 @@ public class ConcurrentLongLongHashMap {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newTable = new long[2 * newCapacity];
             Arrays.fill(newTable, EmptyKey);
 
@@ -697,7 +838,8 @@ public class ConcurrentLongLongHashMap {
             // Capacity needs to be updated after the values, so that we won't 
see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static void insertKeyValueNoLock(long[] table, int capacity, 
long key, long value) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
index 8610833..e9e94c8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -46,33 +46,112 @@ public class ConcurrentOpenHashMap<K, V> {
     private static final Object EmptyKey = null;
     private static final Object DeletedKey = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section<K, V>[] sections;
 
+    public static <K, V> Builder<K, V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentOpenHashMap.
+     */
+    public static class Builder<K, V> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<K, V> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<K, V> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<K, V> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<K, V> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<K, V> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<K, V> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<K, V> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentOpenHashMap<K, V> build() {
+            return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, 
shrinkFactor);
+        }
+    }
+
+    @Deprecated
     public ConcurrentOpenHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentOpenHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, 
DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float 
shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / 
MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / 
mapFillFactor);
         this.sections = (Section<K, V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, 
mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -197,16 +276,31 @@ public class ConcurrentOpenHashMap<K, V> {
         private volatile Object[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new Object[2 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         V get(K key, int keyHash) {
@@ -303,9 +397,11 @@ public class ConcurrentOpenHashMap<K, V> {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -340,7 +436,20 @@ public class ConcurrentOpenHashMap<K, V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -351,6 +460,9 @@ public class ConcurrentOpenHashMap<K, V> {
                 Arrays.fill(table, EmptyKey);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -419,7 +531,20 @@ public class ConcurrentOpenHashMap<K, V> {
 
                 return removedCount;
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -446,9 +571,8 @@ public class ConcurrentOpenHashMap<K, V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             Object[] newTable = new Object[2 * newCapacity];
 
             // Re-hash table
@@ -465,7 +589,8 @@ public class ConcurrentOpenHashMap<K, V> {
             // Capacity needs to be updated after the values, so that we won't 
see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <K, V> void insertKeyValueNoLock(Object[] table, int 
capacity, K key, V value) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
index 83a953d..35fce48 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -44,33 +44,112 @@ public class ConcurrentOpenHashSet<V> {
     private static final Object EmptyValue = null;
     private static final Object DeletedValue = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section<V>[] sections;
 
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentOpenHashSet.
+     */
+    public static class Builder<V> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<V> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<V> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<V> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<V> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<V> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<V> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<V> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentOpenHashSet<V> build() {
+            return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, 
shrinkFactor);
+        }
+    }
+
+    @Deprecated
     public ConcurrentOpenHashSet() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentOpenHashSet(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, 
DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float 
shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / 
MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / 
mapFillFactor);
         this.sections = new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, 
mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -151,16 +230,31 @@ public class ConcurrentOpenHashSet<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private final int initCapacity;
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.values = (V[]) new Object[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         boolean contains(V value, int keyHash) {
@@ -256,9 +350,11 @@ public class ConcurrentOpenHashSet<V> {
                     ++bucket;
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -309,7 +405,20 @@ public class ConcurrentOpenHashSet<V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -320,6 +429,9 @@ public class ConcurrentOpenHashSet<V> {
                 Arrays.fill(values, EmptyValue);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -368,9 +480,8 @@ public class ConcurrentOpenHashSet<V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             V[] newValues = (V[]) new Object[newCapacity];
 
             // Re-hash table
@@ -386,7 +497,8 @@ public class ConcurrentOpenHashSet<V> {
             // Capacity needs to be updated after the values, so that we won't 
see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <V> void insertValueNoLock(V[] values, V value) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index cec38cd..b675488 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -52,21 +52,29 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongHashMap<String>(0);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashMap<String>(16, 0);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashMap<String>(4, 8);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -74,8 +82,64 @@ public class ConcurrentLongHashMapTest {
     }
 
     @Test
+    public void testClear() {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, "v1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, "v2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertNull(map.put(4, "v4"));
+        assertNull(map.put(5, "v5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() 
operation
+        assertNull(map.put(6, "v6"));
+        assertTrue(map.remove(6, "v6"));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
     public void simpleInsertions() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put(1, "one"));
@@ -103,7 +167,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testRemove() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder().build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put(1, "one"));
@@ -119,7 +184,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testRemoveIf() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(1L, "one");
         map.put(2L, "two");
@@ -136,7 +204,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testNegativeUsedBucketCount() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(0, "zero");
         assertEquals(1, map.getUsedBucketCount());
@@ -151,7 +222,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 
2, 1);
+        ConcurrentLongHashMap<Integer> map = 
ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -166,7 +240,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 
2, 1);
+        ConcurrentLongHashMap<Integer> map = 
ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -188,7 +265,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -223,7 +301,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -258,7 +337,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void stressConcurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ConcurrentLongHashMap<String> map =
+                
ConcurrentLongHashMap.<String>newBuilder().expectedItems(4).concurrencyLevel(1).build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int writeThreads = 16;
@@ -325,7 +405,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder().build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -369,7 +450,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int buckets = 16;
-        ConcurrentLongHashMap<String> map = new 
ConcurrentLongHashMap<>(buckets, 1);
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -402,7 +486,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder().build();
         assertEquals(map.putIfAbsent(1, "one"), null);
         assertEquals(map.get(1), "one");
 
@@ -412,7 +497,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 
1);
+        ConcurrentLongHashMap<Integer> map = 
ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicInteger counter = new AtomicInteger();
         LongFunction<Integer> provider = new LongFunction<Integer>() {
             public Integer apply(long key) {
@@ -439,7 +527,10 @@ public class ConcurrentLongHashMapTest {
 
     public void benchConcurrentLongHashMap() throws Exception {
         // public static void main(String args[]) {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(N)
+                .concurrencyLevel(1)
+                .build();
 
         for (long i = 0; i < Iterations; i++) {
             for (int j = 0; j < N; j++) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
index 34cf470..cce7144 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -46,21 +46,21 @@ public class ConcurrentLongHashSetTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongHashSet(0);
+            ConcurrentLongHashSet.newBuilder().concurrencyLevel(0).build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashSet(16, 0);
+            
ConcurrentLongHashSet.newBuilder().expectedItems(16).concurrencyLevel(0).build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashSet(4, 8);
+            
ConcurrentLongHashSet.newBuilder().expectedItems(4).concurrencyLevel(8).build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -69,7 +69,9 @@ public class ConcurrentLongHashSetTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add(1));
@@ -97,7 +99,7 @@ public class ConcurrentLongHashSetTest {
 
     @Test
     public void testRemove() {
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder().build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add(1));
@@ -112,7 +114,10 @@ public class ConcurrentLongHashSetTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -127,7 +132,10 @@ public class ConcurrentLongHashSetTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -149,7 +157,7 @@ public class ConcurrentLongHashSetTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -183,7 +191,7 @@ public class ConcurrentLongHashSetTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashSet map = new ConcurrentLongHashSet();
+        ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -216,8 +224,62 @@ public class ConcurrentLongHashSetTest {
     }
 
     @Test
+    public void testClear() {
+        ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add(1));
+        assertTrue(map.add(2));
+        assertTrue(map.add(3));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add(1));
+        assertTrue(map.add(2));
+        assertTrue(map.add(3));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.add(4));
+        assertTrue(map.add(5));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() 
operation
+        assertTrue(map.add(6));
+        assertTrue(map.remove(6));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
     public void testIteration() {
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder().build();
 
         assertEquals(set.items(), Collections.emptySet());
 
@@ -244,7 +306,10 @@ public class ConcurrentLongHashSetTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int buckets = 16;
-        ConcurrentLongHashSet set = new ConcurrentLongHashSet(buckets, 1);
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
index abd96ec..100a1e1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -49,21 +49,29 @@ public class ConcurrentLongLongHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongLongHashMap(0);
+            ConcurrentLongLongHashMap.newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongLongHashMap(16, 0);
+            ConcurrentLongLongHashMap.newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongLongHashMap(4, 8);
+            ConcurrentLongLongHashMap.newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -72,7 +80,9 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertEquals(map.put(1, 11), -1);
@@ -99,8 +109,60 @@ public class ConcurrentLongLongHashMapTest {
     }
 
     @Test
+    public void testClear() {
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.put(1, 1) == -1);
+        assertTrue(map.put(2, 2) == -1);
+        assertTrue(map.put(3, 3) == -1);
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.put(1, 1) == -1);
+        assertTrue(map.put(2, 2) == -1);
+        assertTrue(map.put(3, 3) == -1);
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, 1));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, 2));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.put(4, 4) == -1);
+        assertTrue(map.put(5, 5) == -1);
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() 
operation
+        assertTrue(map.put(6, 6) == -1);
+        assertTrue(map.remove(6, 6));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
     public void testRemove() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ConcurrentLongLongHashMap map = 
ConcurrentLongLongHashMap.newBuilder().build();
 
         assertTrue(map.isEmpty());
         assertEquals(map.put(1, 11), -1);
@@ -116,7 +178,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testNegativeUsedBucketCount() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(0, 0);
         assertEquals(1, map.getUsedBucketCount());
@@ -131,7 +196,10 @@ public class ConcurrentLongLongHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 
1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -146,7 +214,10 @@ public class ConcurrentLongLongHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 
1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -168,7 +239,7 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ConcurrentLongLongHashMap map = 
ConcurrentLongLongHashMap.newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -203,7 +274,7 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ConcurrentLongLongHashMap map = 
ConcurrentLongLongHashMap.newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -238,7 +309,7 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ConcurrentLongLongHashMap map = 
ConcurrentLongLongHashMap.newBuilder().build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -282,7 +353,10 @@ public class ConcurrentLongLongHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int buckets = 16;
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(buckets, 
1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -320,7 +394,7 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ConcurrentLongLongHashMap map = 
ConcurrentLongLongHashMap.newBuilder().build();
         assertEquals(map.putIfAbsent(1, 11), -1);
         assertEquals(map.get(1), 11);
 
@@ -330,7 +404,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicLong counter = new AtomicLong();
         LongLongFunction provider = new LongLongFunction() {
             public long apply(long key) {
@@ -353,7 +430,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testAddAndGet() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         assertEquals(map.addAndGet(0, 0), 0);
         assertEquals(map.containsKey(0), true);
@@ -382,7 +462,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testRemoveIf() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(1L, 1L);
         map.put(2L, 2L);
@@ -399,7 +482,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testRemoveIfValue() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(1L, 1L);
         map.put(2L, 2L);
@@ -416,7 +502,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testIvalidKeys() {
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         try {
             map.put(-5, 4);
@@ -461,7 +550,10 @@ public class ConcurrentLongLongHashMapTest {
 
     @Test
     public void testAsMap() {
-        ConcurrentLongLongHashMap lmap = new ConcurrentLongLongHashMap(16, 1);
+        ConcurrentLongLongHashMap lmap = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         lmap.put(1, 11);
         lmap.put(2, 22);
         lmap.put(3, 33);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
index 20ad0bd..e1ec4f8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
@@ -52,21 +52,27 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentOpenHashMap<String, String>(0);
+            ConcurrentOpenHashMap.<String, 
String>newBuilder().expectedItems(0).build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashMap<String, String>(16, 0);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashMap<String, String>(4, 8);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -75,7 +81,9 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentOpenHashMap<String, String> map = new 
ConcurrentOpenHashMap<>(16);
+        ConcurrentOpenHashMap<String, String> map = 
ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put("1", "one"));
@@ -102,8 +110,63 @@ public class ConcurrentOpenHashMapTest {
     }
 
     @Test
+    public void testClear() {
+        ConcurrentOpenHashMap<String, String> map = 
ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put("k1", "v1"));
+        assertNull(map.put("k2", "v2"));
+        assertNull(map.put("k3", "v3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentOpenHashMap<String, String> map = 
ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put("k1", "v1"));
+        assertNull(map.put("k2", "v2"));
+        assertNull(map.put("k3", "v3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove("k1", "v1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove("k2", "v2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertNull(map.put("k4", "v4"));
+        assertNull(map.put("k5", "v5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() 
operation
+        assertNull(map.put("k6", "v6"));
+        assertTrue(map.remove("k6", "v6"));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
     public void testRemove() {
-        ConcurrentOpenHashMap<String, String> map = new 
ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder().build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put("1", "one"));
@@ -120,7 +183,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentOpenHashMap<String, Integer> map = new 
ConcurrentOpenHashMap<>(n / 2, 1);
+        ConcurrentOpenHashMap<String, Integer> map = 
ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -135,7 +201,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentOpenHashMap<Integer, Integer> map = new 
ConcurrentOpenHashMap<>(n / 2, 1);
+        ConcurrentOpenHashMap<Integer, Integer> map = 
ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -157,7 +226,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentOpenHashMap<Long, String> map = new 
ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -192,7 +262,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentOpenHashMap<Long, String> map = new 
ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -227,7 +298,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentOpenHashMap<Long, String> map = new 
ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -271,7 +343,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int buckets = 16;
-        ConcurrentOpenHashMap<Long, String> map = new 
ConcurrentOpenHashMap<>(buckets, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, 
String>newBuilder()
+                .expectedItems(buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -304,7 +379,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentOpenHashMap<Long, String> map = new 
ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         assertEquals(map.putIfAbsent(1L, "one"), null);
         assertEquals(map.get(1L), "one");
 
@@ -314,7 +390,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentOpenHashMap<Integer, Integer> map = new 
ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<Integer, Integer> map = 
ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicInteger counter = new AtomicInteger();
         Function<Integer, Integer> provider = key -> counter.getAndIncrement();
 
@@ -333,7 +412,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testRemoval() {
-        ConcurrentOpenHashMap<Integer, String> map = new 
ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Integer, String> map =
+                ConcurrentOpenHashMap.<Integer, String>newBuilder().build();
         map.put(0, "0");
         map.put(1, "1");
         map.put(3, "3");
@@ -381,7 +461,8 @@ public class ConcurrentOpenHashMapTest {
             }
         }
 
-        ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<T, String> map =
+                ConcurrentOpenHashMap.<T, String>newBuilder().build();
 
         T t1 = new T(1);
         T t1B = new T(1);
@@ -407,7 +488,10 @@ public class ConcurrentOpenHashMapTest {
 
     public void benchConcurrentOpenHashMap() throws Exception {
         // public static void main(String args[]) {
-        ConcurrentOpenHashMap<Long, String> map = new 
ConcurrentOpenHashMap<>(N, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, 
String>newBuilder()
+                .expectedItems(N)
+                .concurrencyLevel(1)
+                .build();
 
         for (long i = 0; i < Iterations; i++) {
             for (int j = 0; j < N; j++) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
index d4f74f9..72dd93f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
@@ -45,21 +45,29 @@ public class ConcurrentOpenHashSetTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentOpenHashSet<String>(0);
+            ConcurrentOpenHashSet.<String>newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashSet<String>(16, 0);
+            ConcurrentOpenHashSet.<String>newBuilder()
+                    .expectedItems(14)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashSet<String>(4, 8);
+            ConcurrentOpenHashSet.<String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -68,7 +76,9 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>(16);
+        ConcurrentOpenHashSet<String> set = 
ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add("1"));
@@ -95,8 +105,63 @@ public class ConcurrentOpenHashSetTest {
     }
 
     @Test
+    public void testClear() {
+        ConcurrentOpenHashSet<String> map = 
ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add("k1"));
+        assertTrue(map.add("k2"));
+        assertTrue(map.add("k3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentOpenHashSet<String> map = 
ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add("k1"));
+        assertTrue(map.add("k2"));
+        assertTrue(map.add("k3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove("k1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove("k2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.add("k4"));
+        assertTrue(map.add("k5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() 
operation
+        assertTrue(map.add("k6"));
+        assertTrue(map.remove("k6"));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
     public void testRemove() {
-        ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<String> set =
+                ConcurrentOpenHashSet.<String>newBuilder().build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add("1"));
@@ -111,7 +176,10 @@ public class ConcurrentOpenHashSetTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(n / 
2, 1);
+        ConcurrentOpenHashSet<Integer> set = 
ConcurrentOpenHashSet.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -126,7 +194,10 @@ public class ConcurrentOpenHashSetTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(n / 
2, 1);
+        ConcurrentOpenHashSet<Integer> set = 
ConcurrentOpenHashSet.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -148,7 +219,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> set =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -182,7 +254,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> map =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -216,7 +289,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void testIteration() {
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> set =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
 
         assertEquals(set.values(), Collections.emptyList());
 
@@ -243,7 +317,10 @@ public class ConcurrentOpenHashSetTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int buckets = 16;
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>(buckets, 
1);
+        ConcurrentOpenHashSet<Long> set = 
ConcurrentOpenHashSet.<Long>newBuilder()
+                .expectedItems(buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -298,7 +375,8 @@ public class ConcurrentOpenHashSetTest {
             }
         }
 
-        ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<T> set =
+                ConcurrentOpenHashSet.<T>newBuilder().build();
 
         T t1 = new T(1);
         T t1B = new T(1);
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index 94f06cb..d8a0e52 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -73,7 +73,7 @@ class EtcdLedgerManager implements LedgerManager {
     private final KV kvClient;
     private final EtcdWatchClient watchClient;
     private final ConcurrentLongHashMap<ValueStream<LedgerMetadata>> watchers =
-        new ConcurrentLongHashMap<>();
+            
ConcurrentLongHashMap.<ValueStream<LedgerMetadata>>newBuilder().build();
     private final ConcurrentMap<LedgerMetadataListener, 
LedgerMetadataConsumer> listeners =
         new ConcurrentHashMap<>();
 
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
index 381ebfa..8fbe0c7 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
@@ -66,9 +66,9 @@ public class EtcdWatchClient implements AutoCloseable {
     private volatile StreamObserver<WatchRequest> grpcWatchStreamObserver;
     // watchers stores a mapping between watchID -> EtcdWatcher.
     private final ConcurrentLongHashMap<EtcdWatcher> watchers =
-        new ConcurrentLongHashMap<>();
+            ConcurrentLongHashMap.<EtcdWatcher>newBuilder().build();
     private final LinkedList<EtcdWatcher> pendingWatchers = new LinkedList<>();
-    private final ConcurrentLongHashSet cancelSet = new 
ConcurrentLongHashSet();
+    private final ConcurrentLongHashSet cancelSet = 
ConcurrentLongHashSet.newBuilder().build();
 
     // scheduler
     private final OrderedScheduler scheduler;
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
index b9727c9..13fd547 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
@@ -43,8 +43,9 @@ public class RangeRoutingTableImpl implements 
RangeRoutingTable {
 
     public RangeRoutingTableImpl(StorageServerClientManager manager) {
         this.manager = manager;
-        this.ranges = new ConcurrentLongHashMap<>();
-        this.outstandingUpdates = new ConcurrentLongHashMap<>();
+        this.ranges = 
ConcurrentLongHashMap.<RangeRouter<byte[]>>newBuilder().build();
+        this.outstandingUpdates =
+                
ConcurrentLongHashMap.<CompletableFuture<RangeRouter<byte[]>>>newBuilder().build();
     }
 
     @VisibleForTesting
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
index f4fdc4d..8684a8a 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
@@ -92,7 +92,7 @@ public class ZkStorageContainerManager
             new 
ThreadFactoryBuilder().setNameFormat("zk-storage-container-manager").build());
         this.liveContainers = 
Collections.synchronizedMap(Maps.newConcurrentMap());
         this.pendingStartStopContainers = 
Collections.synchronizedSet(Sets.newConcurrentHashSet());
-        this.containerAssignmentMap = new ConcurrentLongHashMap<>();
+        this.containerAssignmentMap = 
ConcurrentLongHashMap.<Endpoint>newBuilder().build();
         this.clusterAssignmentMap = Maps.newHashMap();
         // probe the containers every 1/2 of controller scheduling interval. 
this ensures the manager
         // can attempt to start containers before controller reassign them.
diff --git 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
index 2c605f7..4543168 100644
--- 
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
+++ 
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
@@ -53,7 +53,7 @@ public class ReadLogMetadataCommandTest extends 
BookieCommandTestBase {
             
when(entryLogger.getEntryLogMetadata(anyLong())).thenReturn(entryLogMetadata);
         });
 
-        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ConcurrentLongLongHashMap map = 
ConcurrentLongLongHashMap.newBuilder().build();
         map.put(1, 1);
         when(entryLogMetadata.getLedgersMap()).thenReturn(map);
 

Reply via email to