mathieudruart commented on pull request #8017:
URL: https://github.com/apache/pinot/pull/8017#issuecomment-1029377970
Hi @KKcorps
We have tested your PR and it seems to miss messages, it seems to have an
issue in the method **getNextStreamParitionMsgOffsetAtIndex** : if the message
is part of a Pulsar batch (BatchMessageIdImpl), you add +1 to the entry id
every time, that doesn't seem to be correct because in fact the next message id
will have only the batch index incremented with the same entry id (all messages
inside a Pulsar batch share the same entry id).
We tried with this version of the method, and it seems to get all messages
correctly :
```
@Override
public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int
index) {
MessageIdImpl currentMessageId =
MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId());
MessageId nextMessageId;
long currentLedgerId = currentMessageId.getLedgerId();
long currentEntryId = currentMessageId.getEntryId();
int currentPartitionIndex = currentMessageId.getPartitionIndex();
if (currentMessageId instanceof BatchMessageIdImpl) {
int currentBatchIndex = ((BatchMessageIdImpl)
currentMessageId).getBatchIndex();
int currentBatchSize = ((BatchMessageIdImpl)
currentMessageId).getBatchSize();
if (currentBatchIndex < currentBatchSize - 1) {
nextMessageId = new BatchMessageIdImpl(currentLedgerId,
currentEntryId,
currentPartitionIndex, currentBatchIndex + 1,
currentBatchSize,
((BatchMessageIdImpl) currentMessageId).getAcker());
} else {
nextMessageId = new BatchMessageIdImpl(currentLedgerId,
currentEntryId + 1,
currentPartitionIndex, 0, currentBatchSize,
((BatchMessageIdImpl) currentMessageId).getAcker());
}
} else {
nextMessageId =
DefaultImplementation.newMessageId(currentLedgerId,
currentEntryId + 1,
currentPartitionIndex);
}
return new MessageIdStreamOffset(nextMessageId);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]