codelipenghui commented on code in PR #19167:
URL: https://github.com/apache/pulsar/pull/19167#discussion_r1065418454
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java:
##########
@@ -23,52 +23,86 @@
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import
org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
+/**
+ * The MessageRedeliveryController is a non-thread-safe container for
maintaining the redelivery messages.
+ */
public class MessageRedeliveryController {
+
+ private final boolean allowOutOfOrderDelivery;
private final ConcurrentBitmapSortedLongPairSet messagesToRedeliver;
private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+ private final ConcurrentLongLongHashMap hashesRefCount;
public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+ this.allowOutOfOrderDelivery = allowOutOfOrderDelivery;
this.messagesToRedeliver = new ConcurrentBitmapSortedLongPairSet();
- this.hashesToBeBlocked = allowOutOfOrderDelivery
- ? null
- : ConcurrentLongLongPairHashMap
+ if (!allowOutOfOrderDelivery) {
+ this.hashesToBeBlocked = ConcurrentLongLongPairHashMap
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
+ this.hashesRefCount = ConcurrentLongLongHashMap
+
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
+ } else {
+ this.hashesToBeBlocked = null;
Review Comment:
Yes. Maybe we can find a better way to combine 2 structs in another PR. For
this PR, just focus on the problem fix, and we need to cherry-pick to release
branches. Otherwise, the producer side will encounter a high publish latency if
the issue happens.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]