lhotari commented on code in PR #21396:
URL: https://github.com/apache/pulsar/pull/21396#discussion_r1365400911


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -256,41 +257,36 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
                 availablePermits = Math.min(availablePermits, 
remainUnAckedMessages);
             }
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, 
availablePermits);
-            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, 
entriesWithSameKey, maxMessagesForC,
+            List<Entry> entriesForC = 
getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, 
maxMessagesForC,
                     readType, consumerStickyKeyHashesMap.get(consumer));
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
-                        name, consumer.consumerName(), messagesForC, readType);
+                        name, consumer.consumerName(), entriesForC.size(), 
readType);
             }
 
-            if (messagesForC < entriesWithSameKeyCount) {
-                // We are not able to push all the messages with given key to 
its consumer,
-                // so we discard for now and mark them for later redelivery
-                for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
-                    Entry entry = entriesWithSameKey.get(i);
-                    long stickyKeyHash = getStickyKeyHash(entry);
-                    addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
-                    entry.release();
-                    entriesWithSameKey.set(i, null);
-                }
-            }
-
-            if (messagesForC > 0) {
+            // We are not able to push all the messages with given key to its 
consumer,
+            // so we discard for now and mark them for later redelivery
+            entriesWithSameKey.stream()
+                    .filter(entryWithTheSameKey -> 
!entriesForC.contains(entryWithTheSameKey))
+                    .forEach(entryToReplay -> {

Review Comment:
   btw. In the Pulsar code base, the Java Streams API is avoided in performance 
hotspots to reduce GC pressure. I'm not sure if that helps in practice, but 
that's one reason why plain `for` loops are preferred. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to