liudezhi2098 opened a new pull request #6404: fix when send a delayed message 
,there is a case when a consumer rest…
URL: https://github.com/apache/pulsar/pull/6404
 
 
   **Motivation**
   Fix when send a delayed message ,there is a case when a consumer restarts 
and pull duplicate messages. #6403  
   
   *read entry*
   ```javascript
   //org.apache.pulsar.client.impl.MessageImpl
   Set<PositionImpl> messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
   
               if (!messagesToReplayNow.isEmpty()) {
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
                               consumerList.size());
                   }
   
                   havePendingReplayRead = true;
                   Set<? extends Position> deletedMessages = 
asyncReplayEntries(messagesToReplayNow);
                   // clear already acked positions from replay bucket
   
                   deletedMessages.forEach(position -> 
messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
                           ((PositionImpl) position).getEntryId()));
                   // if all the entries are acked-entries and cleared up from 
messagesToRedeliver, try to read
                   // next entries as readCompletedEntries-callback was never 
called
                   if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
                       havePendingReplayRead = false;
                       readMoreEntries();
                   }
               } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
                   log.warn("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
                           totalUnackedMessages, maxUnackedMessages);
               } else if (!havePendingRead) {
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] Schedule read of {} messages for {} 
consumers", name, messagesToRead,
                               consumerList.size());
                   }
                   havePendingRead = true;
                   cursor.asyncReadEntriesOrWait(messagesToRead, this, 
ReadType.Normal);
               } else {
                   log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
               }
   ```
   Order of reading messages
   1、getMessagesToReplayNow
   2、asyncReadEntriesOrWait
   
   *send consumer*
   ```
       SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                   List<Entry> entriesForThisConsumer = entries.subList(start, 
start + messagesForC);
   
                   EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesForThisConsumer.size());
                   filterEntriesForConsumer(entriesForThisConsumer, batchSizes, 
sendMessageInfo);
   
                   c.sendMessages(entriesForThisConsumer, batchSizes, 
sendMessageInfo.getTotalMessages(),
                           sendMessageInfo.getTotalBytes(), redeliveryTracker);
   ```
   
   ###### normal situation
   1、ReadType.Normal    
       Because the delay time has not arrived
        but  filterEntriesForConsumer method will fileter 
   2、ReadType.Replay
      delay time has  arrived ,and send consumer
     
   ######  abnormal situation
    when the delay time has not arrived, stop  conumser, 
    then the delay time has  arrived, start  conumser.
    1、ReadType.Replay    
       delay time has  arrived ,and send consumer 
   2、ReadType.Normal
      delay time has  arrived ,and send consumer
   
   
   
   **Changes**
   update readPosition to next
   ```javascript
          positions.stream().filter(position -> 
!alreadyAcknowledgedPositions.contains(position))
                   .forEach(p ->{
                       if (((PositionImpl) p).compareTo(this.readPosition) == 
0) {
                           this.setReadPosition(this.readPosition.getNext());
                           log.warn("[{}][{}] replayPosition{} equals 
readPosition{}," + " need set next readPositio",
                                   ledger.getName(), name, (PositionImpl) p, 
this.readPosition);
                       }
                       ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
                   });
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to