This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 794cdbb Optimize memory:Support shrinking in
ConcurrentLongLongPairHashMap (#3061)
794cdbb is described below
commit 794cdbb5fc6e13e19865b98a386430f103b7408a
Author: lin chen <[email protected]>
AuthorDate: Tue Feb 22 13:01:03 2022 +0800
Optimize memory:Support shrinking in ConcurrentLongLongPairHashMap (#3061)
* support shrink
* Reduce unnecessary rehash
* check style
* fix: unnecessary rehash
* add unit test: testExpandAndShrink
* fix unit test: testExpandAndShrink
* fix test:
1.verify that the map is able to expand after shrink;
2.does not keep shrinking at every remove() operation;
* 1.add builder;
2.add config:
①MapFillFactor;②MapIdleFactor;③autoShrink;④expandFactor;⑤shrinkFactor
* check style
* 1.check style;
2.add check :
shrinkFactor>1
expandFactor>1
* check style
* keep and Deprecate all the public constructors.
* add final for autoShrink
* fix unit test testExpandAndShrink, set autoShrink true
* add method for update parameters value:
①setMapFillFactor
②setMapIdleFactor
③setExpandFactor
④setShrinkFactor
⑤setAutoShrink
* use lombok.Setter replace lombok.Data
* use pulic for getUsedBucketCount
* 1.check parameters;
2.fix the shrinkage condition:
①newCapacity > size: in order to prevent the infinite loop of rehash,
newCapacity should be larger than the currently used size;
②newCapacity > resizeThresholdUp: in order to prevent continuous
expansion and contraction, newCapacity should be greater than the expansion
threshold;
* 1.update parameters check;
2.fix newCapacity calculation when shrinking :
rehash((int) Math.max(size / mapFillFactor, capacity / shrinkFactor));
* remove set methods:
①setMapFillFactor
②setMapIdleFactor
③setExpandFactor
④setShrinkFactor
⑤setAutoShrink
* Repair shrinkage conditions: ①newCapacity must be the nth power of 2;
②reduce unnecessary shrinkage;
* Repair shrinkage conditions
* add shrinkage when clear
* 1.add test for clear shrink
2. fix initCapacity value
---
.../bookkeeper/bookie/storage/ldb/ReadCache.java | 6 +-
.../bookkeeper/bookie/storage/ldb/WriteCache.java | 6 +-
.../collections/ConcurrentLongLongPairHashMap.java | 152 +++++++++++++++++++--
.../ConcurrentLongLongPairHashMapTest.java | 114 ++++++++++++++--
4 files changed, 246 insertions(+), 32 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
index f4038b4..50f4a3b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
@@ -77,7 +77,11 @@ public class ReadCache implements Closeable {
for (int i = 0; i < segmentsCount; i++) {
cacheSegments.add(Unpooled.directBuffer(segmentSize, segmentSize));
- cacheIndexes.add(new ConcurrentLongLongPairHashMap(4096, 2 *
Runtime.getRuntime().availableProcessors()));
+ ConcurrentLongLongPairHashMap concurrentLongLongPairHashMap =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(4096)
+ .concurrencyLevel(2 *
Runtime.getRuntime().availableProcessors())
+ .build();
+ cacheIndexes.add(concurrentLongLongPairHashMap);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index 66eeb3b..fedb61a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -60,8 +60,10 @@ public class WriteCache implements Closeable {
void accept(long ledgerId, long entryId, ByteBuf entry);
}
- private final ConcurrentLongLongPairHashMap index =
- new ConcurrentLongLongPairHashMap(4096, 2 *
Runtime.getRuntime().availableProcessors());
+ private final ConcurrentLongLongPairHashMap index =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(4096)
+ .concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
+ .build();
private final ConcurrentLongLongHashMap lastEntryMap =
new ConcurrentLongLongHashMap(4096, 2 *
Runtime.getRuntime().availableProcessors());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
index 42cb04b..6775610 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -47,13 +47,77 @@ public class ConcurrentLongLongPairHashMap {
private static final long ValueNotFound = -1L;
- private static final float MapFillFactor = 0.66f;
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section[] sections;
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of ConcurrentLongLongPairHashMap.
+ */
+ public static class Builder {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongLongPairHashMap build() {
+ return new ConcurrentLongLongPairHashMap(expectedItems,
concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor,
shrinkFactor);
+ }
+ }
+
/**
* A BiConsumer Long pair.
*/
@@ -75,26 +139,42 @@ public class ConcurrentLongLongPairHashMap {
boolean test(long key1, long key2, long value1, long value2);
}
+ @Deprecated
public ConcurrentLongLongPairHashMap() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentLongLongPairHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentLongLongPairHashMap(int expectedItems, int
concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor,
DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ private ConcurrentLongLongPairHashMap(int expectedItems, int
concurrencyLevel,
+ float mapFillFactor, float
mapIdleFactor,
+ boolean autoShrink, float
expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems /
MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems /
mapFillFactor);
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section(perSectionCapacity);
+ sections[i] = new Section(perSectionCapacity, mapFillFactor,
mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -226,16 +306,31 @@ public class ConcurrentLongLongPairHashMap {
private volatile long[] table;
private volatile int capacity;
+ private final int initCapacity;
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.table = new long[4 * this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
Arrays.fill(table, EmptyKey);
}
@@ -336,9 +431,11 @@ public class ConcurrentLongLongPairHashMap {
bucket = (bucket + 4) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity *
expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -376,7 +473,20 @@ public class ConcurrentLongLongPairHashMap {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity /
shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity *
mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp >
size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -388,6 +498,18 @@ public class ConcurrentLongLongPairHashMap {
table[bucket + 2] = ValueNotFound;
table[bucket + 3] = ValueNotFound;
--usedBuckets;
+
+ // Cleanup all the buckets that were in `DeletedKey` state, so
that we can reduce unnecessary expansions
+ bucket = (bucket - 4) & (table.length - 1);
+ while (table[bucket] == DeletedKey) {
+ table[bucket] = EmptyKey;
+ table[bucket + 1] = EmptyKey;
+ table[bucket + 2] = ValueNotFound;
+ table[bucket + 3] = ValueNotFound;
+ --usedBuckets;
+
+ bucket = (bucket - 4) & (table.length - 1);
+ }
} else {
table[bucket] = DeletedKey;
table[bucket + 1] = DeletedKey;
@@ -403,6 +525,9 @@ public class ConcurrentLongLongPairHashMap {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -453,9 +578,7 @@ public class ConcurrentLongLongPairHashMap {
}
}
- private void rehash() {
- // Expand the hashmap
- int newCapacity = capacity * 2;
+ private void rehash(int newCapacity) {
long[] newTable = new long[4 * newCapacity];
Arrays.fill(newTable, EmptyKey);
@@ -475,7 +598,8 @@ public class ConcurrentLongLongPairHashMap {
// Capacity needs to be updated after the values, so that we won't
see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static void insertKeyValueNoLock(long[] table, int capacity,
long key1, long key2, long value1,
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
index f8bfd52..b56bd31 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -48,21 +48,29 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testConstructor() {
try {
- new ConcurrentLongLongPairHashMap(0);
+ ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongLongPairHashMap(16, 0);
+ ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongLongPairHashMap(4, 8);
+ ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -71,8 +79,9 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void simpleInsertions() {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap(16);
-
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(map.isEmpty());
assertTrue(map.put(1, 1, 11, 11));
assertFalse(map.isEmpty());
@@ -99,7 +108,9 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testRemove() {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap();
+ ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap
+ .newBuilder()
+ .build();
assertTrue(map.isEmpty());
assertTrue(map.put(1, 1, 11, 11));
@@ -114,8 +125,63 @@ public class ConcurrentLongLongPairHashMapTest {
}
@Test
+ public void testClear() {
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.put(1, 1, 11, 11));
+ assertTrue(map.put(2, 2, 22, 22));
+ assertTrue(map.put(3, 3, 33, 33));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.put(1, 1, 11, 11));
+ assertTrue(map.put(2, 2, 22, 22));
+ assertTrue(map.put(3, 3, 33, 33));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove(1, 1, 11, 11));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove(2, 2, 22, 22));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertTrue(map.put(4, 4, 44, 44));
+ assertTrue(map.put(5, 5, 55, 55));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove()
operation
+ assertTrue(map.put(6, 6, 66, 66));
+ assertTrue(map.remove(6, 6, 66, 66));
+ assertTrue(map.capacity() == 8);
+ }
+
+ @Test
public void testNegativeUsedBucketCount() {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap(16, 1);
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(0, 0, 0, 0);
assertEquals(1, map.getUsedBucketCount());
@@ -130,7 +196,10 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap(n / 2, 1);
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -145,7 +214,10 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap(n / 2, 1);
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -167,7 +239,8 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap();
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -206,7 +279,8 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap();
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .build();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
@@ -245,7 +319,8 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testIteration() {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap();
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .build();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -288,7 +363,9 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testPutIfAbsent() {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap();
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .build();
+
assertTrue(map.putIfAbsent(1, 1, 11, 11));
assertEquals(map.get(1, 1), new LongPair(11, 11));
@@ -298,7 +375,11 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testIvalidKeys() {
- ConcurrentLongLongPairHashMap map = new
ConcurrentLongLongPairHashMap(16, 1);
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+
try {
map.put(-5, 3, 4, 4);
@@ -331,7 +412,10 @@ public class ConcurrentLongLongPairHashMapTest {
@Test
public void testAsMap() {
- ConcurrentLongLongPairHashMap lmap = new
ConcurrentLongLongPairHashMap(16, 1);
+ ConcurrentLongLongPairHashMap lmap =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
lmap.put(1, 1, 11, 11);
lmap.put(2, 2, 22, 22);
lmap.put(3, 3, 33, 33);