lhotari commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645299661
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -114,9 +114,11 @@ public void
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
- MessageMetadata msgMetadata = entryWrapper.isPresent() &&
entryWrapper.get()[i] != null
- ? entryWrapper.get()[i].getMetadata()
+ int entryWrapperIndex = i + entryWrapperOffset;
+ MessageMetadata msgMetadata = entryWrapper.isPresent() &&
entryWrapper.get()[entryWrapperIndex] != null
+ ? entryWrapper.get()[entryWrapperIndex].getMetadata()
: null;
+ System.out.println(msgMetadata.getNumMessagesInBatch());
Review comment:
Forgotten System.out debug logging?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType,
List<Entry> entries) {
EntryBatchSizes batchSizes =
EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
- filterEntriesForConsumer(Optional.ofNullable(entryWrappers),
entriesForThisConsumer, batchSizes,
- sendMessageInfo, batchIndexesAcks, cursor, readType ==
ReadType.Replay);
+ filterEntriesForConsumer(Optional.ofNullable(entryWrappers),
start, entriesForThisConsumer,
+ batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);
c.sendMessages(entriesForThisConsumer, batchSizes,
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
int msgSent = sendMessageInfo.getTotalMessages();
+ totalMessages -= msgSent;
Review comment:
The naming of `totalMessages` seems a bit confusing. Isn't it more about
`remainingMessages`?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType,
List<Entry> entries) {
EntryBatchSizes batchSizes =
EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
- filterEntriesForConsumer(Optional.ofNullable(entryWrappers),
entriesForThisConsumer, batchSizes,
- sendMessageInfo, batchIndexesAcks, cursor, readType ==
ReadType.Replay);
+ filterEntriesForConsumer(Optional.ofNullable(entryWrappers),
start, entriesForThisConsumer,
+ batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);
c.sendMessages(entriesForThisConsumer, batchSizes,
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
int msgSent = sendMessageInfo.getTotalMessages();
+ totalMessages -= msgSent;
Review comment:
I'm trying to understand the logic beyond the changes made in this PR. I
have a question about the calculations.
A few lines below, there is this line of code on line 545:
```
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent -
batchIndexesAcks.getTotalAckedIndexCount()));
```
Just wondering if `batchIndexesAcks.getTotalAckedIndexCount()` should
impact the calculation `totalMessages -= msgSent`. Why does it impact
`totalAvailablePermits` field (updated with `TOTAL_AVAILABLE_PERMITS_UPDATER` )?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]