poorbarcode commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1515671343
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -219,6 +242,24 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
}
+
+ // Update if the markDeletePosition move forward
+ updateIfNeededAndGetLastSentPosition();
+
+ // Should not access to individualDeletedMessages from outside
managed cursor
+ // because it doesn't guarantee thread safety.
+ if (lastSentPosition == null) {
Review Comment:
Just mark this for easier reading: initialize `lastSentPosition &
individuallySentPositions ` after the dispatcher created or the first consumer
joined.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -308,6 +361,62 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
Review Comment:
Just mark this for easier reading: update `lastSentPosition` to the position
of the first consecutive sent message
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -122,15 +136,18 @@ public synchronized CompletableFuture<Void>
addConsumer(Consumer consumer) {
})
).thenRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
- PositionImpl readPositionWhenJoining = (PositionImpl)
cursor.getReadPosition();
- consumer.setReadPositionWhenJoining(readPositionWhenJoining);
- // If this was the 1st consumer, or if all the messages are
already acked, then we
- // don't need to do anything special
- if (!allowOutOfOrderDelivery
- && recentlyJoinedConsumers != null
- && consumerList.size() > 1
- &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
- recentlyJoinedConsumers.put(consumer,
readPositionWhenJoining);
+ if (!allowOutOfOrderDelivery) {
+ final PositionImpl lastSentPositionWhenJoining =
updateIfNeededAndGetLastSentPosition();
+ if (lastSentPositionWhenJoining != null) {
+
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+ // If this was the 1st consumer, or if all the
messages are already acked, then we
+ // don't need to do anything special
+ if (recentlyJoinedConsumers != null
+ && consumerList.size() > 1
+ &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+ recentlyJoinedConsumers.put(consumer,
lastSentPositionWhenJoining);
Review Comment:
After consumers redelivery messages, you did not remove them out of
`individuallySentPositions`, right? So the mechanism of "Calculate the lastest
position sent" could not work as expected, right?
Suggestion: when a new consumer joins, we can calculate "the lastest
position sent" in real-time relying on these variables:
- `cursor.individualDeletedMessages`
- `dispatcher.redeliveryMessages`
- `consumer.pendingAcks`
Then the new variable `individuallySentPositions ` can be removed, and we do
not need to update it in-time, which makes the logic simpler
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -276,12 +317,24 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
if (messagesForC > 0) {
- // remove positions first from replay list first :
sendMessages recycles entries
- if (readType == ReadType.Replay) {
- for (int i = 0; i < messagesForC; i++) {
- Entry entry = entriesWithSameKey.get(i);
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ for (int i = 0; i < messagesForC; i++) {
+ final Entry entry = entriesWithSameKey.get(i);
+ // remove positions first from replay list first :
sendMessages recycles entries
+ if (readType == ReadType.Replay) {
redeliveryMessages.remove(entry.getLedgerId(),
entry.getEntryId());
}
+ // Add positions to individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery) {
Review Comment:
Just mark this for easier reading: record the positions sent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]