This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9352feb7a1cd466d3c570fc8d2d2f5365abe5ebc Author: Jiwei Guo <[email protected]> AuthorDate: Thu Apr 28 11:06:15 2022 +0800 [improve][broker] Use shrink map for message redelivery. (#15342) (cherry picked from commit 615f05af3e7c72d83b3fe24f64566ed58244ea5d) --- .../broker/service/persistent/MessageRedeliveryController.java | 9 ++++++--- .../service/persistent/MessageRedeliveryControllerTest.java | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index be143565c48..c7f96fffcef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -26,8 +26,8 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; -import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair; import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; import org.apache.pulsar.common.util.collections.LongPairSet; @@ -37,7 +37,10 @@ public class MessageRedeliveryController { public MessageRedeliveryController(boolean allowOutOfOrderDelivery) { this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); - this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2); + this.hashesToBeBlocked = allowOutOfOrderDelivery + ? null + : ConcurrentLongLongPairHashMap + .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build(); } public boolean add(long ledgerId, long entryId) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java index 9a785f6f95f..478677a25e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java @@ -30,7 +30,7 @@ import java.lang.reflect.Field; import java.util.Set; import java.util.TreeSet; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.apache.pulsar.common.util.collections.LongPairSet; import org.testng.annotations.DataProvider; import org.testng.annotations.Test;
