This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4261d23824 [improve][broker] Don't use "recently joined consumers"
rules for Key_Shared in STICKY mode (#23275)
d4261d23824 is described below
commit d4261d23824412da12e27a56c0fa30250bbe6378
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Sep 19 11:14:11 2024 +0300
[improve][broker] Don't use "recently joined consumers" rules for
Key_Shared in STICKY mode (#23275)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d45b9394dd7..26463ba902c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -66,6 +66,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
private final boolean allowOutOfOrderDelivery;
private final StickyKeyConsumerSelector selector;
+ private final boolean recentlyJoinedConsumerTrackingRequired;
private boolean skipNextReplayToTriggerLookAhead = false;
private final KeySharedMode keySharedMode;
@@ -90,10 +91,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
- this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new
LinkedHashMap<>();
- this.individuallySentPositions =
- allowOutOfOrderDelivery ? null : new
ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
this.keySharedMode = ksm.getKeySharedMode();
+ // recent joined consumer tracking is required only for AUTO_SPLIT
mode when out-of-order delivery is disabled
+ this.recentlyJoinedConsumerTrackingRequired =
+ keySharedMode == KeySharedMode.AUTO_SPLIT &&
!allowOutOfOrderDelivery;
+ this.recentlyJoinedConsumers = recentlyJoinedConsumerTrackingRequired
? new LinkedHashMap<>() : null;
+ this.individuallySentPositions =
+ recentlyJoinedConsumerTrackingRequired
+ ? new ConcurrentOpenLongPairRangeSet<>(4096,
positionRangeConverter)
+ : null;
switch (this.keySharedMode) {
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
@@ -138,7 +144,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
})
).thenRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
- if (!allowOutOfOrderDelivery) {
+ if (recentlyJoinedConsumerTrackingRequired) {
final Position lastSentPositionWhenJoining =
updateIfNeededAndGetLastSentPosition();
if (lastSentPositionWhenJoining != null) {
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
@@ -165,7 +171,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// eventually causing all consumers to get stuck.
selector.removeConsumer(consumer);
super.removeConsumer(consumer);
- if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null) {
+ if (recentlyJoinedConsumerTrackingRequired) {
recentlyJoinedConsumers.remove(consumer);
if (consumerList.size() == 1) {
recentlyJoinedConsumers.clear();
@@ -231,7 +237,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
}
}
+ }
+ if (recentlyJoinedConsumerTrackingRequired) {
// Update if the markDeletePosition move forward
updateIfNeededAndGetLastSentPosition();
@@ -273,7 +281,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
redeliveryMessages.remove(entry.getLedgerId(),
entry.getEntryId());
}
// Add positions to individuallySentPositions if necessary
- if (!allowOutOfOrderDelivery) {
+ if (recentlyJoinedConsumerTrackingRequired) {
final Position position = entry.getPosition();
// Store to individuallySentPositions even if
lastSentPosition is null
if ((lastSentPosition == null ||
position.compareTo(lastSentPosition) > 0)
@@ -306,7 +314,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
// Update the last sent position and remove ranges from
individuallySentPositions if necessary
- if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ if (recentlyJoinedConsumerTrackingRequired && lastSentPosition !=
null) {
final ManagedLedger managedLedger = cursor.getManagedLedger();
com.google.common.collect.Range<Position> range =
individuallySentPositions.firstRange();