lhotari commented on code in PR #23352:
URL: https://github.com/apache/pulsar/pull/23352#discussion_r1797733202
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -316,60 +294,6 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- // Update the last sent position and remove ranges from
individuallySentPositions if necessary
- if (recentlyJoinedConsumerTrackingRequired && lastSentPosition !=
null) {
- final ManagedLedger managedLedger = cursor.getManagedLedger();
- com.google.common.collect.Range<Position> range =
individuallySentPositions.firstRange();
-
- // If the upper bound is before the last sent position, we need to
move ahead as these
- // individuallySentPositions are now irrelevant.
- if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
-
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
- lastSentPosition.getEntryId());
- range = individuallySentPositions.firstRange();
- }
-
- if (range != null) {
- // If the lowerBound is ahead of the last sent position,
- // verify if there are any entries in-between.
- if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
-
.getNumberOfEntries(com.google.common.collect.Range.openClosed(lastSentPosition,
- range.lowerEndpoint())) <= 0) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Found a position range to last sent:
{}", name, range);
- }
- Position newLastSentPosition = range.upperEndpoint();
- Position positionAfterNewLastSent = managedLedger
- .getNextValidPosition(newLastSentPosition);
- // sometime ranges are connected but belongs to different
ledgers
- // so, they are placed sequentially
- // eg: (2:10..3:15] can be returned as
(2:10..2:15],[3:0..3:15].
- // So, try to iterate over connected range and found the
last non-connected range
- // which gives new last sent position.
- final Position lastConfirmedEntrySnapshot =
managedLedger.getLastConfirmedEntry();
- if (lastConfirmedEntrySnapshot != null) {
- while
(positionAfterNewLastSent.compareTo(lastConfirmedEntrySnapshot) <= 0) {
- if
(individuallySentPositions.contains(positionAfterNewLastSent.getLedgerId(),
- positionAfterNewLastSent.getEntryId())) {
- range =
individuallySentPositions.rangeContaining(
-
positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId());
- newLastSentPosition = range.upperEndpoint();
- positionAfterNewLastSent =
managedLedger.getNextValidPosition(newLastSentPosition);
- // check if next valid position is also
deleted and part of the deleted-range
- continue;
- }
- break;
- }
- }
-
- if (lastSentPosition.compareTo(newLastSentPosition) < 0) {
- lastSentPosition = newLastSentPosition;
- }
-
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
- lastSentPosition.getEntryId());
- }
- }
- }
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
Review Comment:
this is not a bug, explained in the comment on #23420
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]