baomingyu commented on a change in pull request #10285:
URL: https://github.com/apache/pulsar/pull/10285#discussion_r643027360
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -322,17 +325,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer
consumer, List<Entry> en
maxReadPosition = minReadPositionForRecentJoinedConsumer;
}
}
+
// Here, the consumer is one that has recently joined, so we can only
send messages that were
// published before it has joined.
+ int messageNum = 0;
for (int i = 0; i < maxMessages; i++) {
if (((PositionImpl)
entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
// We have already crossed the divider line. All messages in
the list are now
// newer than what we can currently dispatch to this consumer
- return i;
+ messageNum = i;
+ break;
}
}
- return maxMessages;
+ if (messageNum == 0) {
+ messageNum = maxMessages;
+ CopyOnWriteArrayList<Consumer> consumers = this.getConsumers();
+ for (Consumer consumer1 : consumers) {
+ if (isApendMessage(consumer1, entries.get(0).getLedgerId(),
+ entries.get(0).getEntryId())) {
+ messageNum = 0;
+ break;
+ }
+ }
+ }
+ return messageNum;
+ }
+
+ private boolean isApendMessage(Consumer consuemr, long ledgerId, long
entryId) {
+ if (consuemr != null && consuemr.getPendingAcks().get(ledgerId,
entryId) != null) {
Review comment:
This problem will occasionally occur when restarting the consumers which
using key_shared mode.
This problem has appeared many times in online clusters. After such
modification, this problem has been solved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]