This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 74b00a1 support shrink for ConcurrentLong map or set (#3074)
74b00a1 is described below
commit 74b00a101cb6ffd06dafccfe9d31c8c5be66efaf
Author: lin chen <[email protected]>
AuthorDate: Fri Mar 4 01:42:10 2022 +0800
support shrink for ConcurrentLong map or set (#3074)
* support shrink for ConcurrentLong map or set
* fix unit test
* check style
* add shrink unit test.
* fix unit test
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 3 +-
.../EntryLogManagerForEntryLogPerLedger.java | 3 +-
.../apache/bookkeeper/bookie/EntryLogMetadata.java | 5 +-
.../bookkeeper/bookie/FileInfoBackingCache.java | 3 +-
.../bookkeeper/bookie/HandleFactoryImpl.java | 4 +-
.../bookie/storage/ldb/EntryLocationIndex.java | 2 +-
.../bookie/storage/ldb/LedgerMetadataIndex.java | 2 +-
.../ldb/SingleDirectoryDbLedgerStorage.java | 6 +-
.../bookkeeper/bookie/storage/ldb/WriteCache.java | 8 +-
.../bookkeeper/proto/PerChannelBookieClient.java | 2 +-
.../util/collections/ConcurrentLongHashMap.java | 156 ++++++++++++++++--
.../util/collections/ConcurrentLongHashSet.java | 137 ++++++++++++++--
.../collections/ConcurrentLongLongHashMap.java | 182 ++++++++++++++++++---
.../util/collections/ConcurrentOpenHashMap.java | 155 ++++++++++++++++--
.../util/collections/ConcurrentOpenHashSet.java | 140 ++++++++++++++--
.../collections/ConcurrentLongHashMapTest.java | 125 ++++++++++++--
.../collections/ConcurrentLongHashSetTest.java | 87 ++++++++--
.../collections/ConcurrentLongLongHashMapTest.java | 130 ++++++++++++---
.../collections/ConcurrentOpenHashMapTest.java | 116 +++++++++++--
.../collections/ConcurrentOpenHashSetTest.java | 102 ++++++++++--
.../metadata/etcd/EtcdLedgerManager.java | 2 +-
.../bookkeeper/metadata/etcd/EtcdWatchClient.java | 4 +-
.../impl/routing/RangeRoutingTableImpl.java | 5 +-
.../storage/impl/sc/ZkStorageContainerManager.java | 2 +-
.../bookie/ReadLogMetadataCommandTest.java | 2 +-
25 files changed, 1212 insertions(+), 171 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index ff0d15d..b382c4d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -111,7 +111,8 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
private int exitCode = ExitCode.OK;
- private final ConcurrentLongHashMap<byte[]> masterKeyCache = new
ConcurrentLongHashMap<>();
+ private final ConcurrentLongHashMap<byte[]> masterKeyCache =
+ ConcurrentLongHashMap.<byte[]>newBuilder().build();
protected StateManager stateManager;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index 5e123dc..c04ccbe 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -275,7 +275,8 @@ class EntryLogManagerForEntryLogPerLedger extends
EntryLogManagerBase {
super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
this.rotatedLogChannels = new
CopyOnWriteArrayList<BufferedLogChannel>();
- this.replicaOfCurrentLogChannels = new
ConcurrentLongHashMap<BufferedLogChannelWithDirInfo>();
+ this.replicaOfCurrentLogChannels =
+
ConcurrentLongHashMap.<BufferedLogChannelWithDirInfo>newBuilder().build();
this.entrylogMapAccessExpiryTimeInSeconds =
conf.getEntrylogMapAccessExpiryTimeInSeconds();
this.maximumNumberOfActiveEntryLogs =
conf.getMaximumNumberOfActiveEntryLogs();
this.entryLogPerLedgerCounterLimitsMultFactor =
conf.getEntryLogPerLedgerCounterLimitsMultFactor();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
index 6dab68c..0445033 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
@@ -44,7 +44,10 @@ public class EntryLogMetadata {
private static final short DEFAULT_SERIALIZATION_VERSION = 0;
protected EntryLogMetadata() {
- ledgersMap = new ConcurrentLongLongHashMap(256, 1);
+ ledgersMap = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(256)
+ .concurrencyLevel(1)
+ .build();
}
public EntryLogMetadata(long logId) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 078292f..887cd70 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -33,7 +33,8 @@ class FileInfoBackingCache {
static final int DEAD_REF = -0xdead;
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- final ConcurrentLongHashMap<CachedFileInfo> fileInfos = new
ConcurrentLongHashMap<>();
+ final ConcurrentLongHashMap<CachedFileInfo> fileInfos =
+ ConcurrentLongHashMap.<CachedFileInfo>newBuilder().build();
final FileLoader fileLoader;
final int fileInfoVersionToWrite;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
index 2bc72e2..ce03b59 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
@@ -34,8 +34,8 @@ class HandleFactoryImpl implements HandleFactory,
LedgerDeletionListener {
HandleFactoryImpl(LedgerStorage ledgerStorage) {
this.ledgerStorage = ledgerStorage;
- this.ledgers = new ConcurrentLongHashMap<>();
- this.readOnlyLedgers = new ConcurrentLongHashMap<>();
+ this.ledgers =
ConcurrentLongHashMap.<LedgerDescriptor>newBuilder().build();
+ this.readOnlyLedgers =
ConcurrentLongHashMap.<LedgerDescriptor>newBuilder().build();
ledgerStorage.registerLedgerDeletionListener(this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index 6b7a20d..792d96d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
public class EntryLocationIndex implements Closeable {
private final KeyValueStorage locationsDb;
- private final ConcurrentLongHashSet deletedLedgers = new
ConcurrentLongHashSet();
+ private final ConcurrentLongHashSet deletedLedgers =
ConcurrentLongHashSet.newBuilder().build();
private final EntryLocationIndexStats stats;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
index eb7548c..df79c1b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -72,7 +72,7 @@ public class LedgerMetadataIndex implements Closeable {
StatsLogger stats) throws IOException {
ledgersDb = storageFactory.newKeyValueStorage(basePath, "ledgers",
DbConfigType.Small, conf);
- ledgers = new ConcurrentLongHashMap<>();
+ ledgers = ConcurrentLongHashMap.<LedgerData>newBuilder().build();
ledgersCount = new AtomicInteger();
// Read all ledgers from db
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index ce8af36..d13962f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -176,8 +176,10 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
entryLocationIndex = new EntryLocationIndex(conf,
KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger);
- transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
- Runtime.getRuntime().availableProcessors() * 2);
+ transientLedgerInfoCache =
ConcurrentLongHashMap.<TransientLedgerInfo>newBuilder()
+ .expectedItems(16 * 1024)
+ .concurrencyLevel(Runtime.getRuntime().availableProcessors() *
2)
+ .build();
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo,
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
TimeUnit.MINUTES);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index fedb61a..659162e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -65,8 +65,10 @@ public class WriteCache implements Closeable {
.concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
.build();
- private final ConcurrentLongLongHashMap lastEntryMap =
- new ConcurrentLongLongHashMap(4096, 2 *
Runtime.getRuntime().availableProcessors());
+ private final ConcurrentLongLongHashMap lastEntryMap =
ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(4096)
+ .concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
+ .build();
private final ByteBuf[] cacheSegments;
private final int segmentsCount;
@@ -80,7 +82,7 @@ public class WriteCache implements Closeable {
private final AtomicLong cacheOffset = new AtomicLong(0);
private final LongAdder cacheCount = new LongAdder();
- private final ConcurrentLongHashSet deletedLedgers = new
ConcurrentLongHashSet();
+ private final ConcurrentLongHashSet deletedLedgers =
ConcurrentLongHashSet.newBuilder().build();
private final ByteBufAllocator allocator;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 19ad306..ab054a5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -181,7 +181,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final int startTLSTimeout;
private final ConcurrentOpenHashMap<CompletionKey, CompletionValue>
completionObjects =
- new ConcurrentOpenHashMap<CompletionKey, CompletionValue>();
+ ConcurrentOpenHashMap.<CompletionKey,
CompletionValue>newBuilder().build();
// Map that hold duplicated read requests. The idea is to only use this
map (synchronized) when there is a duplicate
// read request for the same ledgerId/entryId
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index 8993b57..d7ee024 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -47,11 +47,76 @@ public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
+
+ public static <V> Builder<V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentLongHashMap.
+ */
+ public static class Builder<T> {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder<T> expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder<T> concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder<T> mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder<T> mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder<T> expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder<T> shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder<T> autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongHashMap<T> build() {
+ return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor,
shrinkFactor);
+ }
+ }
+
+
/**
* Predicate specialization for (long, V) types.
*
@@ -63,26 +128,42 @@ public class ConcurrentLongHashMap<V> {
private final Section<V>[] sections;
+ @Deprecated
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor,
DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float
shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems /
MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems /
mapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section<>(perSectionCapacity);
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor,
mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -219,17 +300,32 @@ public class ConcurrentLongHashMap<V> {
private volatile V[] values;
private volatile int capacity;
+ private final int initCapacity;
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}
V get(long key, int keyHash) {
@@ -343,9 +439,10 @@ public class ConcurrentLongHashMap<V> {
++bucket;
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -408,7 +505,20 @@ public class ConcurrentLongHashMap<V> {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -444,7 +554,20 @@ public class ConcurrentLongHashMap<V> {
return removedCount;
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -456,6 +579,9 @@ public class ConcurrentLongHashMap<V> {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -508,9 +634,8 @@ public class ConcurrentLongHashMap<V> {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];
@@ -529,7 +654,8 @@ public class ConcurrentLongHashMap<V> {
// Capacity needs to be updated after the values, so that we won't
see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static <V> void insertKeyValueNoLock(long[] keys, V[] values,
long key, V value) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
index acb5573..5627f8b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -45,8 +45,74 @@ public class ConcurrentLongHashSet {
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section[] sections;
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of ConcurrentLongHashSet.
+ */
+ public static class Builder {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongHashSet build() {
+ return new ConcurrentLongHashSet(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor,
shrinkFactor);
+ }
+ }
+
+
/**
* A consumer of long values.
*/
@@ -54,18 +120,33 @@ public class ConcurrentLongHashSet {
void accept(long item);
}
+ @Deprecated
public ConcurrentLongHashSet() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentLongHashSet(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentLongHashSet(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor,
DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentLongHashSet(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float
shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
@@ -73,7 +154,8 @@ public class ConcurrentLongHashSet {
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section(perSectionCapacity);
+ sections[i] = new Section(perSectionCapacity, mapFillFactor,
mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -169,16 +251,31 @@ public class ConcurrentLongHashSet {
private volatile long[] table;
private volatile int capacity;
+ private final int initCapacity;
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.table = new long[this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
Arrays.fill(table, EmptyItem);
}
@@ -263,9 +360,11 @@ public class ConcurrentLongHashSet {
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -296,7 +395,20 @@ public class ConcurrentLongHashSet {
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -327,6 +439,9 @@ public class ConcurrentLongHashSet {
Arrays.fill(table, EmptyItem);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -371,9 +486,8 @@ public class ConcurrentLongHashSet {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
long[] newTable = new long[newCapacity];
Arrays.fill(newTable, EmptyItem);
@@ -390,7 +504,8 @@ public class ConcurrentLongHashSet {
// Capacity needs to be updated after the values, so that we won't
see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * SetFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static void insertKeyValueNoLock(long[] table, int capacity,
long item) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index c5d1c7b..2e773b2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -47,13 +47,76 @@ public class ConcurrentLongLongHashMap {
private static final long ValueNotFound = -1L;
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section[] sections;
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of ConcurrentLongLongHashMap.
+ */
+ public static class Builder {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongLongHashMap build() {
+ return new ConcurrentLongLongHashMap(expectedItems,
concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor,
shrinkFactor);
+ }
+ }
+
/**
* A Long-Long BiConsumer.
*/
@@ -75,26 +138,42 @@ public class ConcurrentLongLongHashMap {
boolean test(long key, long value);
}
+ @Deprecated
public ConcurrentLongLongHashMap() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentLongLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentLongLongHashMap(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor,
DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentLongLongHashMap(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor,
float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems /
MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems /
mapFillFactor);
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section(perSectionCapacity);
+ sections[i] = new Section(perSectionCapacity, mapFillFactor,
mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -288,16 +367,31 @@ public class ConcurrentLongLongHashMap {
private volatile long[] table;
private volatile int capacity;
+ private final int initCapacity;
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.table = new long[2 * this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
Arrays.fill(table, EmptyKey);
}
@@ -395,9 +489,11 @@ public class ConcurrentLongLongHashMap {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -451,9 +547,11 @@ public class ConcurrentLongLongHashMap {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -509,9 +607,11 @@ public class ConcurrentLongLongHashMap {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -547,7 +647,20 @@ public class ConcurrentLongLongHashMap {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -572,7 +685,20 @@ public class ConcurrentLongLongHashMap {
return removedCount;
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -598,7 +724,20 @@ public class ConcurrentLongLongHashMap {
return removedCount;
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -631,6 +770,9 @@ public class ConcurrentLongLongHashMap {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -677,9 +819,8 @@ public class ConcurrentLongLongHashMap {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
long[] newTable = new long[2 * newCapacity];
Arrays.fill(newTable, EmptyKey);
@@ -697,7 +838,8 @@ public class ConcurrentLongLongHashMap {
// Capacity needs to be updated after the values, so that we won't
see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static void insertKeyValueNoLock(long[] table, int capacity,
long key, long value) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
index 8610833..e9e94c8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -46,33 +46,112 @@ public class ConcurrentOpenHashMap<K, V> {
private static final Object EmptyKey = null;
private static final Object DeletedKey = new Object();
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section<K, V>[] sections;
+ public static <K, V> Builder<K, V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentOpenHashMap.
+ */
+ public static class Builder<K, V> {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder<K, V> expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder<K, V> concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder<K, V> mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder<K, V> mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder<K, V> expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder<K, V> shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder<K, V> autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentOpenHashMap<K, V> build() {
+ return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor,
shrinkFactor);
+ }
+ }
+
+ @Deprecated
public ConcurrentOpenHashMap() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentOpenHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor,
DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float
shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems /
MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems /
mapFillFactor);
this.sections = (Section<K, V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section<>(perSectionCapacity);
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor,
mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -197,16 +276,31 @@ public class ConcurrentOpenHashMap<K, V> {
private volatile Object[] table;
private volatile int capacity;
+ private final int initCapacity;
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.table = new Object[2 * this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}
V get(K key, int keyHash) {
@@ -303,9 +397,11 @@ public class ConcurrentOpenHashMap<K, V> {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -340,7 +436,20 @@ public class ConcurrentOpenHashMap<K, V> {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -351,6 +460,9 @@ public class ConcurrentOpenHashMap<K, V> {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -419,7 +531,20 @@ public class ConcurrentOpenHashMap<K, V> {
return removedCount;
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -446,9 +571,8 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
Object[] newTable = new Object[2 * newCapacity];
// Re-hash table
@@ -465,7 +589,8 @@ public class ConcurrentOpenHashMap<K, V> {
// Capacity needs to be updated after the values, so that we won't
see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static <K, V> void insertKeyValueNoLock(Object[] table, int
capacity, K key, V value) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
index 83a953d..35fce48 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -44,33 +44,112 @@ public class ConcurrentOpenHashSet<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section<V>[] sections;
+ public static <V> Builder<V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentOpenHashSet.
+ */
+ public static class Builder<V> {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder<V> expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder<V> concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder<V> mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder<V> mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder<V> expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder<V> shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder<V> autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentOpenHashSet<V> build() {
+ return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor,
shrinkFactor);
+ }
+ }
+
+ @Deprecated
public ConcurrentOpenHashSet() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentOpenHashSet(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor,
DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float
shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems /
MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems /
mapFillFactor);
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section<>(perSectionCapacity);
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor,
mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -151,16 +230,31 @@ public class ConcurrentOpenHashSet<V> {
private volatile V[] values;
private volatile int capacity;
+ private final int initCapacity;
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}
boolean contains(V value, int keyHash) {
@@ -256,9 +350,11 @@ public class ConcurrentOpenHashSet<V> {
++bucket;
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -309,7 +405,20 @@ public class ConcurrentOpenHashSet<V> {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -320,6 +429,9 @@ public class ConcurrentOpenHashSet<V> {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -368,9 +480,8 @@ public class ConcurrentOpenHashSet<V> {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
V[] newValues = (V[]) new Object[newCapacity];
// Re-hash table
@@ -386,7 +497,8 @@ public class ConcurrentOpenHashSet<V> {
// Capacity needs to be updated after the values, so that we won't
see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static <V> void insertValueNoLock(V[] values, V value) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index cec38cd..b675488 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -52,21 +52,29 @@ public class ConcurrentLongHashMapTest {
@Test
public void testConstructor() {
try {
- new ConcurrentLongHashMap<String>(0);
+ ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongHashMap<String>(16, 0);
+ ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongHashMap<String>(4, 8);
+ ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -74,8 +82,64 @@ public class ConcurrentLongHashMapTest {
}
@Test
+ public void testClear() {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put(1, "v1"));
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put(1, "v1"));
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove(1, "v1"));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove(2, "v2"));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertNull(map.put(4, "v4"));
+ assertNull(map.put(5, "v5"));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove()
operation
+ assertNull(map.put(6, "v6"));
+ assertTrue(map.remove(6, "v6"));
+ assertTrue(map.capacity() == 8);
+ }
+
+ @Test
public void simpleInsertions() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
@@ -103,7 +167,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void testRemove() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map =
+ ConcurrentLongHashMap.<String>newBuilder().build();
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
@@ -119,7 +184,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testRemoveIf() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(1L, "one");
map.put(2L, "two");
@@ -136,7 +204,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testNegativeUsedBucketCount() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(0, "zero");
assertEquals(1, map.getUsedBucketCount());
@@ -151,7 +222,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n /
2, 1);
+ ConcurrentLongHashMap<Integer> map =
ConcurrentLongHashMap.<Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -166,7 +240,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n /
2, 1);
+ ConcurrentLongHashMap<Integer> map =
ConcurrentLongHashMap.<Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -188,7 +265,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map =
+ ConcurrentLongHashMap.<String>newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -223,7 +301,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map =
+ ConcurrentLongHashMap.<String>newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -258,7 +337,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void stressConcurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+ ConcurrentLongHashMap<String> map =
+
ConcurrentLongHashMap.<String>newBuilder().expectedItems(4).concurrencyLevel(1).build();
ExecutorService executor = Executors.newCachedThreadPool();
final int writeThreads = 16;
@@ -325,7 +405,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void testIteration() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map =
+ ConcurrentLongHashMap.<String>newBuilder().build();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -369,7 +450,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testHashConflictWithDeletion() {
final int buckets = 16;
- ConcurrentLongHashMap<String> map = new
ConcurrentLongHashMap<>(buckets, 1);
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -402,7 +486,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void testPutIfAbsent() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map =
+ ConcurrentLongHashMap.<String>newBuilder().build();
assertEquals(map.putIfAbsent(1, "one"), null);
assertEquals(map.get(1), "one");
@@ -412,7 +497,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testComputeIfAbsent() {
- ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16,
1);
+ ConcurrentLongHashMap<Integer> map =
ConcurrentLongHashMap.<Integer>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
AtomicInteger counter = new AtomicInteger();
LongFunction<Integer> provider = new LongFunction<Integer>() {
public Integer apply(long key) {
@@ -439,7 +527,10 @@ public class ConcurrentLongHashMapTest {
public void benchConcurrentLongHashMap() throws Exception {
// public static void main(String args[]) {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(N)
+ .concurrencyLevel(1)
+ .build();
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
index 34cf470..cce7144 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -46,21 +46,21 @@ public class ConcurrentLongHashSetTest {
@Test
public void testConstructor() {
try {
- new ConcurrentLongHashSet(0);
+ ConcurrentLongHashSet.newBuilder().concurrencyLevel(0).build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongHashSet(16, 0);
+
ConcurrentLongHashSet.newBuilder().expectedItems(16).concurrencyLevel(0).build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongHashSet(4, 8);
+
ConcurrentLongHashSet.newBuilder().expectedItems(4).concurrencyLevel(8).build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -69,7 +69,9 @@ public class ConcurrentLongHashSetTest {
@Test
public void simpleInsertions() {
- ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(set.isEmpty());
assertTrue(set.add(1));
@@ -97,7 +99,7 @@ public class ConcurrentLongHashSetTest {
@Test
public void testRemove() {
- ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder().build();
assertTrue(set.isEmpty());
assertTrue(set.add(1));
@@ -112,7 +114,10 @@ public class ConcurrentLongHashSetTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -127,7 +132,10 @@ public class ConcurrentLongHashSetTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -149,7 +157,7 @@ public class ConcurrentLongHashSetTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -183,7 +191,7 @@ public class ConcurrentLongHashSetTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongHashSet map = new ConcurrentLongHashSet();
+ ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -216,8 +224,62 @@ public class ConcurrentLongHashSetTest {
}
@Test
+ public void testClear() {
+ ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add(1));
+ assertTrue(map.add(2));
+ assertTrue(map.add(3));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add(1));
+ assertTrue(map.add(2));
+ assertTrue(map.add(3));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove(1));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove(2));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertTrue(map.add(4));
+ assertTrue(map.add(5));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove()
operation
+ assertTrue(map.add(6));
+ assertTrue(map.remove(6));
+ assertTrue(map.capacity() == 8);
+ }
+
+ @Test
public void testIteration() {
- ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder().build();
assertEquals(set.items(), Collections.emptySet());
@@ -244,7 +306,10 @@ public class ConcurrentLongHashSetTest {
@Test
public void testHashConflictWithDeletion() {
final int buckets = 16;
- ConcurrentLongHashSet set = new ConcurrentLongHashSet(buckets, 1);
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
index abd96ec..100a1e1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -49,21 +49,29 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testConstructor() {
try {
- new ConcurrentLongLongHashMap(0);
+ ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongLongHashMap(16, 0);
+ ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongLongHashMap(4, 8);
+ ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -72,7 +80,9 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void simpleInsertions() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(map.isEmpty());
assertEquals(map.put(1, 11), -1);
@@ -99,8 +109,60 @@ public class ConcurrentLongLongHashMapTest {
}
@Test
+ public void testClear() {
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.put(1, 1) == -1);
+ assertTrue(map.put(2, 2) == -1);
+ assertTrue(map.put(3, 3) == -1);
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.put(1, 1) == -1);
+ assertTrue(map.put(2, 2) == -1);
+ assertTrue(map.put(3, 3) == -1);
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove(1, 1));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove(2, 2));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertTrue(map.put(4, 4) == -1);
+ assertTrue(map.put(5, 5) == -1);
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove()
operation
+ assertTrue(map.put(6, 6) == -1);
+ assertTrue(map.remove(6, 6));
+ assertTrue(map.capacity() == 8);
+ }
+
+ @Test
public void testRemove() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ConcurrentLongLongHashMap map =
ConcurrentLongLongHashMap.newBuilder().build();
assertTrue(map.isEmpty());
assertEquals(map.put(1, 11), -1);
@@ -116,7 +178,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testNegativeUsedBucketCount() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(0, 0);
assertEquals(1, map.getUsedBucketCount());
@@ -131,7 +196,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2,
1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -146,7 +214,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2,
1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -168,7 +239,7 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ConcurrentLongLongHashMap map =
ConcurrentLongLongHashMap.newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -203,7 +274,7 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ConcurrentLongLongHashMap map =
ConcurrentLongLongHashMap.newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -238,7 +309,7 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testIteration() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ConcurrentLongLongHashMap map =
ConcurrentLongLongHashMap.newBuilder().build();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -282,7 +353,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testHashConflictWithDeletion() {
final int buckets = 16;
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(buckets,
1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -320,7 +394,7 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testPutIfAbsent() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ConcurrentLongLongHashMap map =
ConcurrentLongLongHashMap.newBuilder().build();
assertEquals(map.putIfAbsent(1, 11), -1);
assertEquals(map.get(1), 11);
@@ -330,7 +404,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testComputeIfAbsent() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
AtomicLong counter = new AtomicLong();
LongLongFunction provider = new LongLongFunction() {
public long apply(long key) {
@@ -353,7 +430,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testAddAndGet() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.addAndGet(0, 0), 0);
assertEquals(map.containsKey(0), true);
@@ -382,7 +462,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testRemoveIf() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(1L, 1L);
map.put(2L, 2L);
@@ -399,7 +482,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testRemoveIfValue() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(1L, 1L);
map.put(2L, 2L);
@@ -416,7 +502,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testIvalidKeys() {
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
try {
map.put(-5, 4);
@@ -461,7 +550,10 @@ public class ConcurrentLongLongHashMapTest {
@Test
public void testAsMap() {
- ConcurrentLongLongHashMap lmap = new ConcurrentLongLongHashMap(16, 1);
+ ConcurrentLongLongHashMap lmap = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
lmap.put(1, 11);
lmap.put(2, 22);
lmap.put(3, 33);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
index 20ad0bd..e1ec4f8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
@@ -52,21 +52,27 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testConstructor() {
try {
- new ConcurrentOpenHashMap<String, String>(0);
+ ConcurrentOpenHashMap.<String,
String>newBuilder().expectedItems(0).build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentOpenHashMap<String, String>(16, 0);
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentOpenHashMap<String, String>(4, 8);
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -75,7 +81,9 @@ public class ConcurrentOpenHashMapTest {
@Test
public void simpleInsertions() {
- ConcurrentOpenHashMap<String, String> map = new
ConcurrentOpenHashMap<>(16);
+ ConcurrentOpenHashMap<String, String> map =
ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(map.isEmpty());
assertNull(map.put("1", "one"));
@@ -102,8 +110,63 @@ public class ConcurrentOpenHashMapTest {
}
@Test
+ public void testClear() {
+ ConcurrentOpenHashMap<String, String> map =
ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put("k1", "v1"));
+ assertNull(map.put("k2", "v2"));
+ assertNull(map.put("k3", "v3"));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentOpenHashMap<String, String> map =
ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put("k1", "v1"));
+ assertNull(map.put("k2", "v2"));
+ assertNull(map.put("k3", "v3"));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove("k1", "v1"));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove("k2", "v2"));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertNull(map.put("k4", "v4"));
+ assertNull(map.put("k5", "v5"));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove()
operation
+ assertNull(map.put("k6", "v6"));
+ assertTrue(map.remove("k6", "v6"));
+ assertTrue(map.capacity() == 8);
+ }
+
+ @Test
public void testRemove() {
- ConcurrentOpenHashMap<String, String> map = new
ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, String> map =
+ ConcurrentOpenHashMap.<String, String>newBuilder().build();
assertTrue(map.isEmpty());
assertNull(map.put("1", "one"));
@@ -120,7 +183,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentOpenHashMap<String, Integer> map = new
ConcurrentOpenHashMap<>(n / 2, 1);
+ ConcurrentOpenHashMap<String, Integer> map =
ConcurrentOpenHashMap.<String, Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -135,7 +201,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentOpenHashMap<Integer, Integer> map = new
ConcurrentOpenHashMap<>(n / 2, 1);
+ ConcurrentOpenHashMap<Integer, Integer> map =
ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -157,7 +226,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentOpenHashMap<Long, String> map = new
ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -192,7 +262,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentOpenHashMap<Long, String> map = new
ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -227,7 +298,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testIteration() {
- ConcurrentOpenHashMap<Long, String> map = new
ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -271,7 +343,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testHashConflictWithDeletion() {
final int buckets = 16;
- ConcurrentOpenHashMap<Long, String> map = new
ConcurrentOpenHashMap<>(buckets, 1);
+ ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long,
String>newBuilder()
+ .expectedItems(buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -304,7 +379,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testPutIfAbsent() {
- ConcurrentOpenHashMap<Long, String> map = new
ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
assertEquals(map.putIfAbsent(1L, "one"), null);
assertEquals(map.get(1L), "one");
@@ -314,7 +390,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testComputeIfAbsent() {
- ConcurrentOpenHashMap<Integer, Integer> map = new
ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<Integer, Integer> map =
ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
AtomicInteger counter = new AtomicInteger();
Function<Integer, Integer> provider = key -> counter.getAndIncrement();
@@ -333,7 +412,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testRemoval() {
- ConcurrentOpenHashMap<Integer, String> map = new
ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Integer, String> map =
+ ConcurrentOpenHashMap.<Integer, String>newBuilder().build();
map.put(0, "0");
map.put(1, "1");
map.put(3, "3");
@@ -381,7 +461,8 @@ public class ConcurrentOpenHashMapTest {
}
}
- ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<T, String> map =
+ ConcurrentOpenHashMap.<T, String>newBuilder().build();
T t1 = new T(1);
T t1B = new T(1);
@@ -407,7 +488,10 @@ public class ConcurrentOpenHashMapTest {
public void benchConcurrentOpenHashMap() throws Exception {
// public static void main(String args[]) {
- ConcurrentOpenHashMap<Long, String> map = new
ConcurrentOpenHashMap<>(N, 1);
+ ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long,
String>newBuilder()
+ .expectedItems(N)
+ .concurrencyLevel(1)
+ .build();
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
index d4f74f9..72dd93f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
@@ -45,21 +45,29 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testConstructor() {
try {
- new ConcurrentOpenHashSet<String>(0);
+ ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentOpenHashSet<String>(16, 0);
+ ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(14)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentOpenHashSet<String>(4, 8);
+ ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -68,7 +76,9 @@ public class ConcurrentOpenHashSetTest {
@Test
public void simpleInsertions() {
- ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>(16);
+ ConcurrentOpenHashSet<String> set =
ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(set.isEmpty());
assertTrue(set.add("1"));
@@ -95,8 +105,63 @@ public class ConcurrentOpenHashSetTest {
}
@Test
+ public void testClear() {
+ ConcurrentOpenHashSet<String> map =
ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add("k1"));
+ assertTrue(map.add("k2"));
+ assertTrue(map.add("k3"));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentOpenHashSet<String> map =
ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add("k1"));
+ assertTrue(map.add("k2"));
+ assertTrue(map.add("k3"));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove("k1"));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove("k2"));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertTrue(map.add("k4"));
+ assertTrue(map.add("k5"));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove()
operation
+ assertTrue(map.add("k6"));
+ assertTrue(map.remove("k6"));
+ assertTrue(map.capacity() == 8);
+ }
+
+ @Test
public void testRemove() {
- ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<String> set =
+ ConcurrentOpenHashSet.<String>newBuilder().build();
assertTrue(set.isEmpty());
assertTrue(set.add("1"));
@@ -111,7 +176,10 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(n /
2, 1);
+ ConcurrentOpenHashSet<Integer> set =
ConcurrentOpenHashSet.<Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -126,7 +194,10 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(n /
2, 1);
+ ConcurrentOpenHashSet<Integer> set =
ConcurrentOpenHashSet.<Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -148,7 +219,8 @@ public class ConcurrentOpenHashSetTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Long> set =
+ ConcurrentOpenHashSet.<Long>newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -182,7 +254,8 @@ public class ConcurrentOpenHashSetTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Long> map =
+ ConcurrentOpenHashSet.<Long>newBuilder().build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -216,7 +289,8 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testIteration() {
- ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Long> set =
+ ConcurrentOpenHashSet.<Long>newBuilder().build();
assertEquals(set.values(), Collections.emptyList());
@@ -243,7 +317,10 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testHashConflictWithDeletion() {
final int buckets = 16;
- ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>(buckets,
1);
+ ConcurrentOpenHashSet<Long> set =
ConcurrentOpenHashSet.<Long>newBuilder()
+ .expectedItems(buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -298,7 +375,8 @@ public class ConcurrentOpenHashSetTest {
}
}
- ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<T> set =
+ ConcurrentOpenHashSet.<T>newBuilder().build();
T t1 = new T(1);
T t1B = new T(1);
diff --git
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index 94f06cb..d8a0e52 100644
---
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -73,7 +73,7 @@ class EtcdLedgerManager implements LedgerManager {
private final KV kvClient;
private final EtcdWatchClient watchClient;
private final ConcurrentLongHashMap<ValueStream<LedgerMetadata>> watchers =
- new ConcurrentLongHashMap<>();
+
ConcurrentLongHashMap.<ValueStream<LedgerMetadata>>newBuilder().build();
private final ConcurrentMap<LedgerMetadataListener,
LedgerMetadataConsumer> listeners =
new ConcurrentHashMap<>();
diff --git
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
index 381ebfa..8fbe0c7 100644
---
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
+++
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.java
@@ -66,9 +66,9 @@ public class EtcdWatchClient implements AutoCloseable {
private volatile StreamObserver<WatchRequest> grpcWatchStreamObserver;
// watchers stores a mapping between watchID -> EtcdWatcher.
private final ConcurrentLongHashMap<EtcdWatcher> watchers =
- new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap.<EtcdWatcher>newBuilder().build();
private final LinkedList<EtcdWatcher> pendingWatchers = new LinkedList<>();
- private final ConcurrentLongHashSet cancelSet = new
ConcurrentLongHashSet();
+ private final ConcurrentLongHashSet cancelSet =
ConcurrentLongHashSet.newBuilder().build();
// scheduler
private final OrderedScheduler scheduler;
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
index b9727c9..13fd547 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
@@ -43,8 +43,9 @@ public class RangeRoutingTableImpl implements
RangeRoutingTable {
public RangeRoutingTableImpl(StorageServerClientManager manager) {
this.manager = manager;
- this.ranges = new ConcurrentLongHashMap<>();
- this.outstandingUpdates = new ConcurrentLongHashMap<>();
+ this.ranges =
ConcurrentLongHashMap.<RangeRouter<byte[]>>newBuilder().build();
+ this.outstandingUpdates =
+
ConcurrentLongHashMap.<CompletableFuture<RangeRouter<byte[]>>>newBuilder().build();
}
@VisibleForTesting
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
index f4fdc4d..8684a8a 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
@@ -92,7 +92,7 @@ public class ZkStorageContainerManager
new
ThreadFactoryBuilder().setNameFormat("zk-storage-container-manager").build());
this.liveContainers =
Collections.synchronizedMap(Maps.newConcurrentMap());
this.pendingStartStopContainers =
Collections.synchronizedSet(Sets.newConcurrentHashSet());
- this.containerAssignmentMap = new ConcurrentLongHashMap<>();
+ this.containerAssignmentMap =
ConcurrentLongHashMap.<Endpoint>newBuilder().build();
this.clusterAssignmentMap = Maps.newHashMap();
// probe the containers every 1/2 of controller scheduling interval.
this ensures the manager
// can attempt to start containers before controller reassign them.
diff --git
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
index 2c605f7..4543168 100644
---
a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
+++
b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
@@ -53,7 +53,7 @@ public class ReadLogMetadataCommandTest extends
BookieCommandTestBase {
when(entryLogger.getEntryLogMetadata(anyLong())).thenReturn(entryLogMetadata);
});
- ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ConcurrentLongLongHashMap map =
ConcurrentLongLongHashMap.newBuilder().build();
map.put(1, 1);
when(entryLogMetadata.getLedgersMap()).thenReturn(map);