lhotari commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1747682951


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -274,96 +251,58 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             }
         }
 
-        final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
-        groupedEntries.clear();
-        final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new 
HashMap<>();
+        // returns a boolean indicating whether look-ahead could be useful, 
when there's a consumer
+        // with available permits, and it's not able to make progress because 
of blocked hashes.
+        MutableBoolean triggerLookAhead = new MutableBoolean();
+        // filter and group the entries by consumer for dispatching
+        final Map<Consumer, List<Entry>> entriesByConsumerForDispatching =
+                filterAndGroupEntriesForDispatching(entries, readType, 
triggerLookAhead);
 
-        for (Entry entry : entries) {
-            int stickyKeyHash = getStickyKeyHash(entry);
-            Consumer c = selector.select(stickyKeyHash);
-            if (c != null) {
-                groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
-                consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new 
HashSet<>()).add(stickyKeyHash);
-            } else {
-                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
-                entry.release();
-            }
-        }
-
-        AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
-
-        int currentThreadKeyNumber = groupedEntries.size();
-        if (currentThreadKeyNumber == 0) {
-            currentThreadKeyNumber = -1;
-        }
-        for (Map.Entry<Consumer, List<Entry>> current : 
groupedEntries.entrySet()) {
+        AtomicInteger remainingConsumersToFinishSending = new 
AtomicInteger(entriesByConsumerForDispatching.size());
+        for (Map.Entry<Consumer, List<Entry>> current : 
entriesByConsumerForDispatching.entrySet()) {
             Consumer consumer = current.getKey();
-            assert consumer != null; // checked when added to groupedEntries
-            List<Entry> entriesWithSameKey = current.getValue();
-            int entriesWithSameKeyCount = entriesWithSameKey.size();
-            int availablePermits = getAvailablePermits(consumer);
-            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
-                    
entriesWithSameKey.stream().map(Entry::getPosition).collect(Collectors.toList()),
 availablePermits,
-                    readType, consumerStickyKeyHashesMap.get(consumer));
+            List<Entry> entriesForConsumer = current.getValue();
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
-                        name, consumer.consumerName(), messagesForC, readType);
+                        name, consumer.consumerName(), 
entriesForConsumer.size(), readType);
             }
-
-            if (messagesForC < entriesWithSameKeyCount) {
-                // We are not able to push all the messages with given key to 
its consumer,
-                // so we discard for now and mark them for later redelivery
-                for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
-                    Entry entry = entriesWithSameKey.get(i);
-                    long stickyKeyHash = getStickyKeyHash(entry);
-                    addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
-                    entry.release();
-                    entriesWithSameKey.set(i, null);
+            final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+            for (Entry entry : entriesForConsumer) {
+                // remove positions first from replay list first : 
sendMessages recycles entries
+                if (readType == ReadType.Replay) {
+                    redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
                 }
-            }
-
-            if (messagesForC > 0) {
-                final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
-                for (int i = 0; i < messagesForC; i++) {
-                    final Entry entry = entriesWithSameKey.get(i);
-                    // remove positions first from replay list first : 
sendMessages recycles entries
-                    if (readType == ReadType.Replay) {
-                        redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
-                    }
-                    // Add positions to individuallySentPositions if necessary
-                    if (!allowOutOfOrderDelivery) {
-                        final Position position = entry.getPosition();
-                        // Store to individuallySentPositions even if 
lastSentPosition is null
-                        if ((lastSentPosition == null || 
position.compareTo(lastSentPosition) > 0)
-                                && 
!individuallySentPositions.contains(position.getLedgerId(), 
position.getEntryId())) {
-                            final Position previousPosition = 
managedLedger.getPreviousPosition(position);
-                            
individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(),
-                                    previousPosition.getEntryId(), 
position.getLedgerId(), position.getEntryId());
-                        }
+                // Add positions to individuallySentPositions if necessary
+                if (!allowOutOfOrderDelivery) {
+                    final Position position = entry.getPosition();
+                    // Store to individuallySentPositions even if 
lastSentPosition is null
+                    if ((lastSentPosition == null || 
position.compareTo(lastSentPosition) > 0)
+                            && 
!individuallySentPositions.contains(position.getLedgerId(), 
position.getEntryId())) {
+                        final Position previousPosition = 
managedLedger.getPreviousPosition(position);
+                        
individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(),
+                                previousPosition.getEntryId(), 
position.getLedgerId(), position.getEntryId());
                     }
                 }
+            }
 
-                SendMessageInfo sendMessageInfo = 
SendMessageInfo.getThreadLocal();
-                EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
-                EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(messagesForC);
-                totalEntries += filterEntriesForConsumer(entriesWithSameKey, 
batchSizes, sendMessageInfo,
-                        batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
-                consumer.sendMessages(entriesWithSameKey, batchSizes, 
batchIndexesAcks,
-                        sendMessageInfo.getTotalMessages(),
-                        sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
-                        getRedeliveryTracker()).addListener(future -> {
-                    if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
-                        readMoreEntries();
-                    }
-                });
+            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+            EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesForConsumer.size());
+            EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entriesForConsumer.size());
+            totalEntries += filterEntriesForConsumer(entriesForConsumer, 
batchSizes, sendMessageInfo,
+                    batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
+            consumer.sendMessages(entriesForConsumer, batchSizes, 
batchIndexesAcks,
+                    sendMessageInfo.getTotalMessages(),
+                    sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
+                    getRedeliveryTracker()).addListener(future -> {
+                if (future.isDone() && 
remainingConsumersToFinishSending.decrementAndGet() == 0) {
+                    readMoreEntries();
+                }
+            });
 
-                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
-                        -(sendMessageInfo.getTotalMessages() - 
batchIndexesAcks.getTotalAckedIndexCount()));
-                totalMessagesSent += sendMessageInfo.getTotalMessages();
-                totalBytesSent += sendMessageInfo.getTotalBytes();
-            } else {
-                currentThreadKeyNumber = keyNumbers.decrementAndGet();
-            }
+            TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
+                    -(sendMessageInfo.getTotalMessages() - 
batchIndexesAcks.getTotalAckedIndexCount()));

Review Comment:
   Unit should be in batch messages 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to