mathieudruart commented on pull request #8017:
URL: https://github.com/apache/pinot/pull/8017#issuecomment-1029377970


   Hi @KKcorps 
   
   We have tested your PR and it seems to miss messages, it seems to have an 
issue in the method **getNextStreamParitionMsgOffsetAtIndex** : if the message 
is part of a Pulsar batch (BatchMessageIdImpl), you add +1 to the entry id 
every time, that doesn't seem to be correct because in fact the next message id 
will have only the batch index incremented with the same entry id (all messages 
inside a Pulsar batch share the same entry id). 
   We tried with this version of the method, and it seems to get all messages 
correctly :
   
   ```
     @Override
     public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int 
index) {
       MessageIdImpl currentMessageId = 
MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId());
       MessageId nextMessageId;
       
       long currentLedgerId = currentMessageId.getLedgerId();
       long currentEntryId = currentMessageId.getEntryId();
       int currentPartitionIndex = currentMessageId.getPartitionIndex();
       
       if (currentMessageId instanceof BatchMessageIdImpl) {
         int currentBatchIndex = ((BatchMessageIdImpl) 
currentMessageId).getBatchIndex();
         int currentBatchSize = ((BatchMessageIdImpl) 
currentMessageId).getBatchSize();
         
         if (currentBatchIndex < currentBatchSize - 1) {
           nextMessageId = new BatchMessageIdImpl(currentLedgerId, 
currentEntryId,
                   currentPartitionIndex, currentBatchIndex + 1, 
currentBatchSize, 
                   ((BatchMessageIdImpl) currentMessageId).getAcker());
         } else {
           nextMessageId = new BatchMessageIdImpl(currentLedgerId, 
currentEntryId + 1,
                   currentPartitionIndex, 0, currentBatchSize, 
((BatchMessageIdImpl) currentMessageId).getAcker());
         }
       } else {
         nextMessageId =
                 DefaultImplementation.newMessageId(currentLedgerId, 
currentEntryId + 1,
                         currentPartitionIndex);
       }
       return new MessageIdStreamOffset(nextMessageId);
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to