This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf5d6aac1b6 [improve][broker] perf: Reduce stickyHash calculations of 
non-persistent topics in SHARED subscriptions (#22536)
bf5d6aac1b6 is described below

commit bf5d6aac1b62d195c544a486bcefec676948a3a4
Author: Andrey Yegorov <[email protected]>
AuthorDate: Fri Apr 26 13:22:19 2024 -0700

    [improve][broker] perf: Reduce stickyHash calculations of non-persistent 
topics in SHARED subscriptions (#22536)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 21 ++++++++++++---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 30 +++++++++++++++++-----
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |  8 +++---
 3 files changed, 45 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 6b2028095e2..b1c3687b3a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -286,16 +286,29 @@ public class Consumer {
                 totalChunkedMessages, redeliveryTracker, 
DEFAULT_CONSUMER_EPOCH);
     }
 
+    public Future<Void> sendMessages(final List<? extends Entry> entries, 
EntryBatchSizes batchSizes,
+                                     EntryBatchIndexesAcks batchIndexesAcks,
+                                     int totalMessages, long totalBytes, long 
totalChunkedMessages,
+                                     RedeliveryTracker redeliveryTracker, long 
epoch) {
+        return sendMessages(entries, null, batchSizes, batchIndexesAcks, 
totalMessages, totalBytes,
+                totalChunkedMessages, redeliveryTracker, epoch);
+    }
+
     /**
      * Dispatch a list of entries to the consumer. <br/>
      * <b>It is also responsible to release entries data and recycle entries 
object.</b>
      *
      * @return a SendMessageInfo object that contains the detail of what was 
sent to consumer
      */
-    public Future<Void> sendMessages(final List<? extends Entry> entries, 
EntryBatchSizes batchSizes,
+    public Future<Void> sendMessages(final List<? extends Entry> entries,
+                                     final List<Integer> stickyKeyHashes,
+                                     EntryBatchSizes batchSizes,
                                      EntryBatchIndexesAcks batchIndexesAcks,
-                                     int totalMessages, long totalBytes, long 
totalChunkedMessages,
-                                     RedeliveryTracker redeliveryTracker, long 
epoch) {
+                                     int totalMessages,
+                                     long totalBytes,
+                                     long totalChunkedMessages,
+                                     RedeliveryTracker redeliveryTracker,
+                                     long epoch) {
         this.lastConsumedTimestamp = System.currentTimeMillis();
 
         if (entries.isEmpty() || totalMessages == 0) {
@@ -323,7 +336,7 @@ public class Consumer {
                 // because this consumer is possible to disconnect at this 
time.
                 if (pendingAcks != null) {
                     int batchSize = batchSizes.getBatchSize(i);
-                    int stickyKeyHash = getStickyKeyHash(entry);
+                    int stickyKeyHash = stickyKeyHashes == null ? 
getStickyKeyHash(entry) : stickyKeyHashes.get(i);
                     long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
                     if (ackSet != null) {
                         unackedMessages -= (batchSize - 
BitSet.valueOf(ackSet).cardinality());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 2cad253f96e..fb7bd22de94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -126,6 +126,14 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, List<Integer>>> 
localGroupedStickyKeyHashes =
+            new FastThreadLocal<Map<Consumer, List<Integer>>>() {
+                @Override
+                protected Map<Consumer, List<Integer>> initialValue() throws 
Exception {
+                    return new HashMap<>();
+                }
+            };
+
     @Override
     public void sendMessages(List<Entry> entries) {
         if (entries.isEmpty()) {
@@ -139,28 +147,38 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
         groupedEntries.clear();
+        final Map<Consumer, List<Integer>> consumerStickyKeyHashesMap = 
localGroupedStickyKeyHashes.get();
+        consumerStickyKeyHashesMap.clear();
 
         for (Entry entry : entries) {
-            Consumer consumer = 
selector.select(peekStickyKey(entry.getDataBuffer()));
+            byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+            int stickyKeyHash = 
StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+
+            Consumer consumer = selector.select(stickyKeyHash);
             if (consumer != null) {
-                groupedEntries.computeIfAbsent(consumer, k -> new 
ArrayList<>()).add(entry);
+                int startingSize = Math.max(10, entries.size() / (2 * 
consumerSet.size()));
+                groupedEntries.computeIfAbsent(consumer, k -> new 
ArrayList<>(startingSize)).add(entry);
+                consumerStickyKeyHashesMap
+                        .computeIfAbsent(consumer, k -> new 
ArrayList<>(startingSize)).add(stickyKeyHash);
             } else {
                 entry.release();
             }
         }
 
         for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : 
groupedEntries.entrySet()) {
-            Consumer consumer = entriesByConsumer.getKey();
-            List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+            final Consumer consumer = entriesByConsumer.getKey();
+            final List<Entry> entriesForConsumer = 
entriesByConsumer.getValue();
+            final List<Integer> stickyKeysForConsumer = 
consumerStickyKeyHashesMap.get(consumer);
 
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesForConsumer.size());
             filterEntriesForConsumer(entriesForConsumer, batchSizes, 
sendMessageInfo, null, null, false, consumer);
 
             if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
-                consumer.sendMessages(entriesForConsumer, batchSizes, null, 
sendMessageInfo.getTotalMessages(),
+                consumer.sendMessages(entriesForConsumer, 
stickyKeysForConsumer, batchSizes,
+                        null, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
-                        getRedeliveryTracker());
+                        getRedeliveryTracker(), 
Commands.DEFAULT_CONSUMER_EPOCH);
                 TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, 
-sendMessageInfo.getTotalMessages());
             } else {
                 entriesForConsumer.forEach(e -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index b2638d53ab1..6b0f48a57cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -128,15 +128,15 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
                 assertEquals(byteBuf.toString(UTF_8), "message" + index);
             };
             return mockPromise;
-        }).when(consumerMock).sendMessages(any(List.class), 
any(EntryBatchSizes.class), any(),
-                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+        }).when(consumerMock).sendMessages(any(List.class), any(List.class), 
any(EntryBatchSizes.class), any(),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), 
anyLong());
         try {
             nonpersistentDispatcher.sendMessages(entries);
         } catch (Exception e) {
             fail("Failed to sendMessages.", e);
         }
-        verify(consumerMock, times(1)).sendMessages(any(List.class), 
any(EntryBatchSizes.class),
-                eq(null), anyInt(), anyLong(), anyLong(), 
any(RedeliveryTracker.class));
+        verify(consumerMock, times(1)).sendMessages(any(List.class), 
any(List.class), any(EntryBatchSizes.class),
+                eq(null), anyInt(), anyLong(), anyLong(), 
any(RedeliveryTracker.class), anyLong());
     }
 
     @Test(timeOut = 10000)

Reply via email to