This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new e310713be43 [improve][broker] Add ref count for sticky hash to
optimize the performance of Key_Shared subscription (#19167)
e310713be43 is described below
commit e310713be433e6e36a4191deb74ae8e5d0a03820
Author: Penghui Li <[email protected]>
AuthorDate: Wed Jan 11 11:01:13 2023 +0800
[improve][broker] Add ref count for sticky hash to optimize the performance
of Key_Shared subscription (#19167)
(cherry picked from commit 74313816826b373170c1fa095a0cae5ab6217224)
---
.../persistent/MessageRedeliveryController.java | 72 ++++++++++++++++------
.../MessageRedeliveryControllerTest.java | 34 ++++++++++
2 files changed, 88 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 7aaba9a4e10..e6febc722de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -22,22 +22,36 @@ import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.NotThreadSafe;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import
org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
+/**
+ * The MessageRedeliveryController is a non-thread-safe container for
maintaining the redelivery messages.
+ */
+@NotThreadSafe
public class MessageRedeliveryController {
+
+ private final boolean allowOutOfOrderDelivery;
private final ConcurrentBitmapSortedLongPairSet messagesToRedeliver;
private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+ private final ConcurrentLongLongHashMap hashesRefCount;
public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+ this.allowOutOfOrderDelivery = allowOutOfOrderDelivery;
this.messagesToRedeliver = new ConcurrentBitmapSortedLongPairSet();
- this.hashesToBeBlocked = allowOutOfOrderDelivery
- ? null
- : ConcurrentLongLongPairHashMap
+ if (!allowOutOfOrderDelivery) {
+ this.hashesToBeBlocked = ConcurrentLongLongPairHashMap
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
+ this.hashesRefCount = ConcurrentLongLongHashMap
+
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
+ } else {
+ this.hashesToBeBlocked = null;
+ this.hashesRefCount = null;
+ }
}
public void add(long ledgerId, long entryId) {
@@ -45,21 +59,43 @@ public class MessageRedeliveryController {
}
public void add(long ledgerId, long entryId, long stickyKeyHash) {
- if (hashesToBeBlocked != null) {
- hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+ if (!allowOutOfOrderDelivery) {
+ boolean inserted = hashesToBeBlocked.putIfAbsent(ledgerId,
entryId, stickyKeyHash, 0);
+ if (!inserted) {
+ hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+ } else {
+ // Return -1 means the key was not present
+ long stored = hashesRefCount.get(stickyKeyHash);
+ hashesRefCount.put(stickyKeyHash, stored > 0 ? ++stored : 1);
+ }
}
messagesToRedeliver.add(ledgerId, entryId);
}
public void remove(long ledgerId, long entryId) {
- if (hashesToBeBlocked != null) {
- hashesToBeBlocked.remove(ledgerId, entryId);
+ if (!allowOutOfOrderDelivery) {
+ removeFromHashBlocker(ledgerId, entryId);
}
messagesToRedeliver.remove(ledgerId, entryId);
}
+ private void removeFromHashBlocker(long ledgerId, long entryId) {
+ LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
+ if (value != null) {
+ boolean removed = hashesToBeBlocked.remove(ledgerId, entryId,
value.first, 0);
+ if (removed) {
+ long exists = hashesRefCount.get(value.first);
+ if (exists == 1) {
+ hashesRefCount.remove(value.first, exists);
+ } else if (exists > 0) {
+ hashesRefCount.put(value.first, exists - 1);
+ }
+ }
+ }
+ }
+
public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId)
{
- if (hashesToBeBlocked != null) {
+ if (!allowOutOfOrderDelivery) {
List<LongPair> keysToRemove = new ArrayList<>();
hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none)
-> {
if (ComparisonChain.start().compare(ledgerId,
markDeleteLedgerId).compare(entryId, markDeleteEntryId)
@@ -67,7 +103,7 @@ public class MessageRedeliveryController {
keysToRemove.add(new LongPair(ledgerId, entryId));
}
});
- keysToRemove.forEach(longPair ->
hashesToBeBlocked.remove(longPair.first, longPair.second));
+ keysToRemove.forEach(longPair ->
removeFromHashBlocker(longPair.first, longPair.second));
keysToRemove.clear();
}
messagesToRedeliver.removeUpTo(markDeleteLedgerId, markDeleteEntryId +
1);
@@ -78,8 +114,9 @@ public class MessageRedeliveryController {
}
public void clear() {
- if (hashesToBeBlocked != null) {
+ if (!allowOutOfOrderDelivery) {
hashesToBeBlocked.clear();
+ hashesRefCount.clear();
}
messagesToRedeliver.clear();
}
@@ -89,15 +126,14 @@ public class MessageRedeliveryController {
}
public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
- final AtomicBoolean isContained = new AtomicBoolean(false);
- if (hashesToBeBlocked != null) {
- hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none)
-> {
- if (!isContained.get() && stickyKeyHashes.contains((int)
stickyKeyHash)) {
- isContained.set(true);
+ if (!allowOutOfOrderDelivery) {
+ for (Integer stickyKeyHash : stickyKeyHashes) {
+ if (hashesRefCount.containsKey(stickyKeyHash)) {
+ return true;
}
- });
+ }
}
- return isContained.get();
+ return false;
}
public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 3cd6fc23de7..f1f2c9117b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,6 +30,7 @@ import java.lang.reflect.Field;
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.testng.annotations.DataProvider;
@@ -56,16 +57,23 @@ public class MessageRedeliveryControllerTest {
ConcurrentLongLongPairHashMap hashesToBeBlocked =
(ConcurrentLongLongPairHashMap) hashesToBeBlockedField
.get(controller);
+ Field hashesRefCountField =
MessageRedeliveryController.class.getDeclaredField("hashesRefCount");
+ hashesRefCountField.setAccessible(true);
+ ConcurrentLongLongHashMap hashesRefCount = (ConcurrentLongLongHashMap)
hashesRefCountField.get(controller);
+
if (allowOutOfOrderDelivery) {
assertNull(hashesToBeBlocked);
+ assertNull(hashesRefCount);
} else {
assertNotNull(hashesToBeBlocked);
+ assertNotNull(hashesRefCount);
}
assertTrue(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 0);
if (!allowOutOfOrderDelivery) {
assertEquals(hashesToBeBlocked.size(), 0);
+ assertEquals(hashesRefCount.size(), 0);
}
controller.add(1, 1);
@@ -79,6 +87,7 @@ public class MessageRedeliveryControllerTest {
assertEquals(hashesToBeBlocked.size(), 0);
assertFalse(hashesToBeBlocked.containsKey(1, 1));
assertFalse(hashesToBeBlocked.containsKey(1, 2));
+ assertEquals(hashesRefCount.size(), 0);
}
controller.remove(1, 1);
@@ -90,6 +99,7 @@ public class MessageRedeliveryControllerTest {
assertFalse(messagesToRedeliver.contains(1, 2));
if (!allowOutOfOrderDelivery) {
assertEquals(hashesToBeBlocked.size(), 0);
+ assertEquals(hashesRefCount.size(), 0);
}
controller.add(2, 1, 100);
@@ -106,6 +116,20 @@ public class MessageRedeliveryControllerTest {
assertEquals(hashesToBeBlocked.get(2, 1).first, 100);
assertEquals(hashesToBeBlocked.get(2, 2).first, 101);
assertEquals(hashesToBeBlocked.get(2, 3).first, 101);
+ assertEquals(hashesRefCount.size(), 2);
+ assertEquals(hashesRefCount.get(100), 1);
+ assertEquals(hashesRefCount.get(101), 2);
+ }
+
+ controller.remove(2, 1);
+ controller.remove(2, 2);
+
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 1);
+ assertEquals(hashesToBeBlocked.get(2, 3).first, 101);
+ assertEquals(hashesRefCount.size(), 1);
+ assertEquals(hashesRefCount.get(100), -1);
+ assertEquals(hashesRefCount.get(101), 1);
}
controller.clear();
@@ -115,6 +139,8 @@ public class MessageRedeliveryControllerTest {
if (!allowOutOfOrderDelivery) {
assertEquals(hashesToBeBlocked.size(), 0);
assertTrue(hashesToBeBlocked.isEmpty());
+ assertEquals(hashesRefCount.size(), 0);
+ assertTrue(hashesRefCount.isEmpty());
}
controller.add(2, 2, 201);
@@ -137,6 +163,11 @@ public class MessageRedeliveryControllerTest {
assertEquals(hashesToBeBlocked.get(2, 2).first, 201);
assertEquals(hashesToBeBlocked.get(3, 1).first, 300);
assertEquals(hashesToBeBlocked.get(3, 2).first, 301);
+ assertEquals(hashesRefCount.size(), 4);
+ assertEquals(hashesRefCount.get(200), 1);
+ assertEquals(hashesRefCount.get(201), 1);
+ assertEquals(hashesRefCount.get(300), 1);
+ assertEquals(hashesRefCount.get(301), 1);
}
controller.removeAllUpTo(3, 1);
@@ -145,6 +176,8 @@ public class MessageRedeliveryControllerTest {
if (!allowOutOfOrderDelivery) {
assertEquals(hashesToBeBlocked.size(), 1);
assertEquals(hashesToBeBlocked.get(3, 2).first, 301);
+ assertEquals(hashesRefCount.size(), 1);
+ assertEquals(hashesRefCount.get(301), 1);
}
controller.removeAllUpTo(5, 10);
@@ -152,6 +185,7 @@ public class MessageRedeliveryControllerTest {
assertEquals(messagesToRedeliver.size(), 0);
if (!allowOutOfOrderDelivery) {
assertEquals(hashesToBeBlocked.size(), 0);
+ assertEquals(hashesRefCount.size(), 0);
}
}