This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e3cfbf8 Fix the out of index issue when dispatch messages based on
the avgBatchSizePerMsg. (#10828)
e3cfbf8 is described below
commit e3cfbf8fadb98bc83ddab057239eb024a00b5f6d
Author: lipenghui <[email protected]>
AuthorDate: Sat Jun 5 04:30:18 2021 +0800
Fix the out of index issue when dispatch messages based on the
avgBatchSizePerMsg. (#10828)
Using the avgBatchSizePerMsg to calculate the entries might over the
remaining entries
The fix is use Math.min(start + messagesForC, entries.size()) to avoid out
of index exception
---
.../service/persistent/PersistentDispatcherMultipleConsumers.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9b8616b..6756c4a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -519,16 +519,16 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);
if (messagesForC > 0) {
-
+ int end = Math.min(start + messagesForC, entries.size());
// remove positions first from replay list first :
sendMessages recycles entries
if (readType == ReadType.Replay) {
- entries.subList(start, start + messagesForC).forEach(entry
-> {
+ entries.subList(start, end).forEach(entry -> {
messagesToRedeliver.remove(entry.getLedgerId(),
entry.getEntryId());
});
}
SendMessageInfo sendMessageInfo =
SendMessageInfo.getThreadLocal();
- List<Entry> entriesForThisConsumer = entries.subList(start,
start + messagesForC);
+ List<Entry> entriesForThisConsumer = entries.subList(start,
end);
EntryBatchSizes batchSizes =
EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entriesForThisConsumer.size());