equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1519261144
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -122,15 +136,18 @@ public synchronized CompletableFuture<Void>
addConsumer(Consumer consumer) {
})
).thenRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
- PositionImpl readPositionWhenJoining = (PositionImpl)
cursor.getReadPosition();
- consumer.setReadPositionWhenJoining(readPositionWhenJoining);
- // If this was the 1st consumer, or if all the messages are
already acked, then we
- // don't need to do anything special
- if (!allowOutOfOrderDelivery
- && recentlyJoinedConsumers != null
- && consumerList.size() > 1
- &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
- recentlyJoinedConsumers.put(consumer,
readPositionWhenJoining);
+ if (!allowOutOfOrderDelivery) {
+ final PositionImpl lastSentPositionWhenJoining =
updateIfNeededAndGetLastSentPosition();
+ if (lastSentPositionWhenJoining != null) {
+
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+ // If this was the 1st consumer, or if all the
messages are already acked, then we
+ // don't need to do anything special
+ if (recentlyJoinedConsumers != null
+ && consumerList.size() > 1
+ &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+ recentlyJoinedConsumers.put(consumer,
lastSentPositionWhenJoining);
Review Comment:
> After consumers redelivery messages, you did not remove them out of
`individuallySentPositions`, right?
Yes.
>So the mechanism of "Calculate the lastest position sent" could not work
as expected, right?
If we also care about redelivery messages, then yes. However, I don't think
we need to care about that fully. We can't control the order of client-side
redelivery operations(e.g. request to redelivery x:1 after x:2).
Of course, we should at least care about broker-side redelivery operations
like the ones below.
https://github.com/apache/pulsar/blob/c36c18d44fd4df07bce9b7961c59685e0e91a420/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L196
https://github.com/apache/pulsar/blob/c36c18d44fd4df07bce9b7961c59685e0e91a420/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L279-L285
> Suggestion: when a new consumer joins, we can calculate "the lastest
position sent" in real-time relying on these variables:
I'll consider about it. Thanks. My initial impression is,
* Is it equivalent to the current approach?
* Does it have a significant impact on performance?
* Can useful stats be implemented?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]