This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bf5d6aac1b6 [improve][broker] perf: Reduce stickyHash calculations of
non-persistent topics in SHARED subscriptions (#22536)
bf5d6aac1b6 is described below
commit bf5d6aac1b62d195c544a486bcefec676948a3a4
Author: Andrey Yegorov <[email protected]>
AuthorDate: Fri Apr 26 13:22:19 2024 -0700
[improve][broker] perf: Reduce stickyHash calculations of non-persistent
topics in SHARED subscriptions (#22536)
---
.../org/apache/pulsar/broker/service/Consumer.java | 21 ++++++++++++---
...istentStickyKeyDispatcherMultipleConsumers.java | 30 +++++++++++++++++-----
...ntStickyKeyDispatcherMultipleConsumersTest.java | 8 +++---
3 files changed, 45 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 6b2028095e2..b1c3687b3a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -286,16 +286,29 @@ public class Consumer {
totalChunkedMessages, redeliveryTracker,
DEFAULT_CONSUMER_EPOCH);
}
+ public Future<Void> sendMessages(final List<? extends Entry> entries,
EntryBatchSizes batchSizes,
+ EntryBatchIndexesAcks batchIndexesAcks,
+ int totalMessages, long totalBytes, long
totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker, long
epoch) {
+ return sendMessages(entries, null, batchSizes, batchIndexesAcks,
totalMessages, totalBytes,
+ totalChunkedMessages, redeliveryTracker, epoch);
+ }
+
/**
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries
object.</b>
*
* @return a SendMessageInfo object that contains the detail of what was
sent to consumer
*/
- public Future<Void> sendMessages(final List<? extends Entry> entries,
EntryBatchSizes batchSizes,
+ public Future<Void> sendMessages(final List<? extends Entry> entries,
+ final List<Integer> stickyKeyHashes,
+ EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- int totalMessages, long totalBytes, long
totalChunkedMessages,
- RedeliveryTracker redeliveryTracker, long
epoch) {
+ int totalMessages,
+ long totalBytes,
+ long totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker,
+ long epoch) {
this.lastConsumedTimestamp = System.currentTimeMillis();
if (entries.isEmpty() || totalMessages == 0) {
@@ -323,7 +336,7 @@ public class Consumer {
// because this consumer is possible to disconnect at this
time.
if (pendingAcks != null) {
int batchSize = batchSizes.getBatchSize(i);
- int stickyKeyHash = getStickyKeyHash(entry);
+ int stickyKeyHash = stickyKeyHashes == null ?
getStickyKeyHash(entry) : stickyKeyHashes.get(i);
long[] ackSet = batchIndexesAcks == null ? null :
batchIndexesAcks.getAckSet(i);
if (ackSet != null) {
unackedMessages -= (batchSize -
BitSet.valueOf(ackSet).cardinality());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 2cad253f96e..fb7bd22de94 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -126,6 +126,14 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
}
};
+ private static final FastThreadLocal<Map<Consumer, List<Integer>>>
localGroupedStickyKeyHashes =
+ new FastThreadLocal<Map<Consumer, List<Integer>>>() {
+ @Override
+ protected Map<Consumer, List<Integer>> initialValue() throws
Exception {
+ return new HashMap<>();
+ }
+ };
+
@Override
public void sendMessages(List<Entry> entries) {
if (entries.isEmpty()) {
@@ -139,28 +147,38 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
groupedEntries.clear();
+ final Map<Consumer, List<Integer>> consumerStickyKeyHashesMap =
localGroupedStickyKeyHashes.get();
+ consumerStickyKeyHashesMap.clear();
for (Entry entry : entries) {
- Consumer consumer =
selector.select(peekStickyKey(entry.getDataBuffer()));
+ byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+ int stickyKeyHash =
StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+
+ Consumer consumer = selector.select(stickyKeyHash);
if (consumer != null) {
- groupedEntries.computeIfAbsent(consumer, k -> new
ArrayList<>()).add(entry);
+ int startingSize = Math.max(10, entries.size() / (2 *
consumerSet.size()));
+ groupedEntries.computeIfAbsent(consumer, k -> new
ArrayList<>(startingSize)).add(entry);
+ consumerStickyKeyHashesMap
+ .computeIfAbsent(consumer, k -> new
ArrayList<>(startingSize)).add(stickyKeyHash);
} else {
entry.release();
}
}
for (Map.Entry<Consumer, List<Entry>> entriesByConsumer :
groupedEntries.entrySet()) {
- Consumer consumer = entriesByConsumer.getKey();
- List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+ final Consumer consumer = entriesByConsumer.getKey();
+ final List<Entry> entriesForConsumer =
entriesByConsumer.getValue();
+ final List<Integer> stickyKeysForConsumer =
consumerStickyKeyHashesMap.get(consumer);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes =
EntryBatchSizes.get(entriesForConsumer.size());
filterEntriesForConsumer(entriesForConsumer, batchSizes,
sendMessageInfo, null, null, false, consumer);
if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
- consumer.sendMessages(entriesForConsumer, batchSizes, null,
sendMessageInfo.getTotalMessages(),
+ consumer.sendMessages(entriesForConsumer,
stickyKeysForConsumer, batchSizes,
+ null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
- getRedeliveryTracker());
+ getRedeliveryTracker(),
Commands.DEFAULT_CONSUMER_EPOCH);
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-sendMessageInfo.getTotalMessages());
} else {
entriesForConsumer.forEach(e -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index b2638d53ab1..6b0f48a57cf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -128,15 +128,15 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
assertEquals(byteBuf.toString(UTF_8), "message" + index);
};
return mockPromise;
- }).when(consumerMock).sendMessages(any(List.class),
any(EntryBatchSizes.class), any(),
- anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+ }).when(consumerMock).sendMessages(any(List.class), any(List.class),
any(EntryBatchSizes.class), any(),
+ anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class),
anyLong());
try {
nonpersistentDispatcher.sendMessages(entries);
} catch (Exception e) {
fail("Failed to sendMessages.", e);
}
- verify(consumerMock, times(1)).sendMessages(any(List.class),
any(EntryBatchSizes.class),
- eq(null), anyInt(), anyLong(), anyLong(),
any(RedeliveryTracker.class));
+ verify(consumerMock, times(1)).sendMessages(any(List.class),
any(List.class), any(EntryBatchSizes.class),
+ eq(null), anyInt(), anyLong(), anyLong(),
any(RedeliveryTracker.class), anyLong());
}
@Test(timeOut = 10000)