merlimat commented on a change in pull request #4062: Delayed message delivery
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r279046388
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -311,15 +307,26 @@ void updatePermitsAndPendingAcks(final List<Entry>
entries, SendMessageInfo sent
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
- int batchSize = getBatchSizeforEntry(metadataAndPayload,
subscription, consumerId);
- if (batchSize == -1) {
- // this would suggest that the message might have been
corrupted
+ MessageMetadata msgMetadata =
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+ PositionImpl pos = (PositionImpl) entry.getPosition();
+ if (msgMetadata == null) {
+ // Message metadata was corrupted
iter.remove();
- PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos),
AckType.Individual, Collections.emptyMap());
continue;
+ } else if (msgMetadata.hasDeliverAtTime()
Review comment:
> The only reason I can see to do this is that you're trying to only call
peekMessageMetadata once. However, the mutation means you are having to copy
the list before the call to sendMessages, which is creating extra GC pressure
even when tracking is disabled.
The problem was already present before this change. If we fail the checksum
or other error, the message is not pushed to consumer and removed from the list
and would have cause the same modification error on the list.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services