ivans0773 commented on code in PR #21396:
URL: https://github.com/apache/pulsar/pull/21396#discussion_r1366525612


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -256,41 +257,36 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
                 availablePermits = Math.min(availablePermits, 
remainUnAckedMessages);
             }
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, 
availablePermits);
-            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, 
entriesWithSameKey, maxMessagesForC,
+            List<Entry> entriesForC = 
getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, 
maxMessagesForC,
                     readType, consumerStickyKeyHashesMap.get(consumer));
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
-                        name, consumer.consumerName(), messagesForC, readType);
+                        name, consumer.consumerName(), entriesForC.size(), 
readType);
             }
 
-            if (messagesForC < entriesWithSameKeyCount) {
-                // We are not able to push all the messages with given key to 
its consumer,
-                // so we discard for now and mark them for later redelivery
-                for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
-                    Entry entry = entriesWithSameKey.get(i);
-                    long stickyKeyHash = getStickyKeyHash(entry);
-                    addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
-                    entry.release();
-                    entriesWithSameKey.set(i, null);
-                }
-            }
-
-            if (messagesForC > 0) {
+            // We are not able to push all the messages with given key to its 
consumer,
+            // so we discard for now and mark them for later redelivery
+            entriesWithSameKey.stream()
+                    .filter(entryWithTheSameKey -> 
!entriesForC.contains(entryWithTheSameKey))
+                    .forEach(entryToReplay -> {

Review Comment:
   Good note, refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to