ivankelly commented on a change in pull request #4062: Delayed message delivery
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r279320918
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -311,15 +307,26 @@ void updatePermitsAndPendingAcks(final List<Entry>
entries, SendMessageInfo sent
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
- int batchSize = getBatchSizeforEntry(metadataAndPayload,
subscription, consumerId);
- if (batchSize == -1) {
- // this would suggest that the message might have been
corrupted
+ MessageMetadata msgMetadata =
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+ PositionImpl pos = (PositionImpl) entry.getPosition();
+ if (msgMetadata == null) {
+ // Message metadata was corrupted
iter.remove();
- PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos),
AckType.Individual, Collections.emptyMap());
continue;
+ } else if (msgMetadata.hasDeliverAtTime()
Review comment:
> Agree, the method was already like that, and it could already modify the
list before. Probably it's about time to rename it.
it would be nice to be able to do the peek before the send is called and
have the result stored somewhere. Unfortunately I don't see a nice way to do
so.
Also, it's not just the mutation that I find weird. The data seems to be
ping ponging between objects too.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services