merlimat commented on a change in pull request #4062: Delayed message delivery
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r286156906
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -284,42 +288,69 @@ private void incrementUnackedMessages(int ackedMessages)
{
}
}
- public static int getBatchSizeforEntry(ByteBuf metadataAndPayload,
Subscription subscription, long consumerId) {
+ public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload,
Subscription subscription,
+ long consumerId) {
+ MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload,
subscription, consumerId);
+ if (msgMetadata == null) {
+ return -1;
+ } else {
+ int numMessagesInBatch = msgMetadata.getNumMessagesInBatch();
+ msgMetadata.recycle();
+ return numMessagesInBatch;
+ }
+ }
+
+ public static MessageMetadata peekMessageMetadata(ByteBuf
metadataAndPayload, Subscription subscription,
+ long consumerId) {
try {
// save the reader index and restore after parsing
- metadataAndPayload.markReaderIndex();
+ int readerIdx = metadataAndPayload.readerIndex();
PulsarApi.MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
- metadataAndPayload.resetReaderIndex();
- int batchSize = metadata.getNumMessagesInBatch();
- metadata.recycle();
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] num messages in batch are {} ",
subscription, consumerId, batchSize);
- }
- return batchSize;
+ metadataAndPayload.readerIndex(readerIdx);
+
+ return metadata;
} catch (Throwable t) {
log.error("[{}] [{}] Failed to parse message metadata",
subscription, consumerId, t);
+ return null;
}
- return -1;
}
- void updatePermitsAndPendingAcks(final List<Entry> entries,
SendMessageInfo sentMessages) throws PulsarServerException {
+ private void updatePermitsAndFilterMessages(final List<Entry> entries,
SendMessageInfo sentMessages) throws PulsarServerException {
int permitsToReduce = 0;
- Iterator<Entry> iter = entries.iterator();
boolean unsupportedVersion = false;
long totalReadableBytes = 0;
boolean clientSupportBatchMessages =
cnx.isBatchMessageCompatibleVersion();
- while (iter.hasNext()) {
- Entry entry = iter.next();
+
+ for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
+ Entry entry = entries.get(i);
ByteBuf metadataAndPayload = entry.getDataBuffer();
- int batchSize = getBatchSizeforEntry(metadataAndPayload,
subscription, consumerId);
- if (batchSize == -1) {
- // this would suggest that the message might have been
corrupted
- iter.remove();
- PositionImpl pos = (PositionImpl) entry.getPosition();
- entry.release();
-
subscription.acknowledgeMessage(Collections.singletonList(pos),
AckType.Individual, Collections.emptyMap());
- continue;
+ PositionImpl pos = (PositionImpl) entry.getPosition();
+
+ int batchSize;
+ MessageMetadata msgMetadata =
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
Review comment:
Did the refactor in #4329
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services