merlimat commented on a change in pull request #6362: [ClientAPI]Fix
hasMessageAvailable()
URL: https://github.com/apache/pulsar/pull/6362#discussion_r386619487
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1396,22 +1401,83 @@ protected void
handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex
{}", remoteAddress,
- topic.getName(), consumer.getSubscription().getName(),
position, partitionIndex);
- }
- MessageIdData messageId = MessageIdData.newBuilder()
- .setLedgerId(((PositionImpl)position).getLedgerId())
- .setEntryId(((PositionImpl)position).getEntryId())
- .setPartition(partitionIndex)
- .build();
- ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
messageId));
+ getLargestBatchIndexWhenPossible(
+ topic,
+ (PositionImpl) position,
+ partitionIndex,
+ requestId,
+ consumer.getSubscription().getName());
+
} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
}
}
+ private void getLargestBatchIndexWhenPossible(
+ Topic topic,
+ PositionImpl position,
+ int partitionIndex,
+ long requestId,
+ String subscriptionName) {
+
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+
+ // If it's not pointing to a valid entry, respond messageId of the
current position.
+ if (position.getEntryId() == -1) {
+ MessageIdData messageId = MessageIdData.newBuilder()
+ .setLedgerId(position.getLedgerId())
+ .setEntryId(position.getEntryId())
+ .setPartition(partitionIndex).build();
+
+ ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
messageId));
+ }
+
+ // For a valid position, we read the entry out and parse the batch
size from its metadata.
+ CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
+ ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
Review comment:
Ideally we should be avoiding this read by just recording the number of
messages in the last batch, along with the current position.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services