liudezhi2098 opened a new pull request #6404: fix when send a delayed message ,there is a case when a consumer rest… URL: https://github.com/apache/pulsar/pull/6404 **Motivation** Fix when send a delayed message ,there is a case when a consumer restarts and pull duplicate messages. #6403 *read entry* ```javascript //org.apache.pulsar.client.impl.MessageImpl Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead); if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), consumerList.size()); } havePendingReplayRead = true; Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; readMoreEntries(); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, totalUnackedMessages, maxUnackedMessages); } else if (!havePendingRead) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead, consumerList.size()); } havePendingRead = true; cursor.asyncReadEntriesOrWait(messagesToRead, this, ReadType.Normal); } else { log.debug("[{}] Cannot schedule next read until previous one is done", name); } ``` Order of reading messages 1、getMessagesToReplayNow 2、asyncReadEntriesOrWait *send consumer* ``` SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size()); filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo); c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), redeliveryTracker); ``` ###### normal situation 1、ReadType.Normal Because the delay time has not arrived but filterEntriesForConsumer method will fileter 2、ReadType.Replay delay time has arrived ,and send consumer ###### abnormal situation when the delay time has not arrived, stop conumser, then the delay time has arrived, start conumser. 1、ReadType.Replay delay time has arrived ,and send consumer 2、ReadType.Normal delay time has arrived ,and send consumer **Changes** update readPosition to next ```javascript positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)) .forEach(p ->{ if (((PositionImpl) p).compareTo(this.readPosition) == 0) { this.setReadPosition(this.readPosition.getNext()); log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPositio", ledger.getName(), name, (PositionImpl) p, this.readPosition); } ledger.asyncReadEntry((PositionImpl) p, cb, ctx); }); ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
