This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 615f05af3e7 [improve][broker] Use shrink map for message redelivery.
(#15342)
615f05af3e7 is described below
commit 615f05af3e7c72d83b3fe24f64566ed58244ea5d
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Apr 28 11:06:15 2022 +0800
[improve][broker] Use shrink map for message redelivery. (#15342)
---
.../broker/service/persistent/MessageRedeliveryController.java | 9 ++++++---
.../service/persistent/MessageRedeliveryControllerTest.java | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index be143565c48..c7f96fffcef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -26,8 +26,8 @@ import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import
org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -37,7 +37,10 @@ public class MessageRedeliveryController {
public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
- this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new
ConcurrentLongLongPairHashMap(128, 2);
+ this.hashesToBeBlocked = allowOutOfOrderDelivery
+ ? null
+ : ConcurrentLongLongPairHashMap
+
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
}
public boolean add(long ledgerId, long entryId) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 9a785f6f95f..478677a25e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,7 +30,7 @@ import java.lang.reflect.Field;
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;