This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea6641e3e51 [improve][broker] Replaced checkBackloggedCursors with
checkBackloggedCursor(single subscription check) upon subscription (#19343)
ea6641e3e51 is described below
commit ea6641e3e51d7681670fea111cbac34080036e1a
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Jan 28 23:19:56 2023 -0800
[improve][broker] Replaced checkBackloggedCursors with
checkBackloggedCursor(single subscription check) upon subscription (#19343)
---
.../broker/service/persistent/PersistentTopic.java | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 15bf568cbb2..ace27c3be96 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -812,7 +812,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
readCompacted, keySharedMeta, startMessageId,
consumerEpoch);
return addConsumerToSubscription(subscription,
consumer).thenCompose(v -> {
- checkBackloggedCursors();
+ if (subscription instanceof PersistentSubscription
persistentSubscription) {
+ checkBackloggedCursor(persistentSubscription);
+ }
if (!cnx.isActive()) {
try {
consumer.close();
@@ -2566,17 +2568,21 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void checkBackloggedCursors() {
- // activate caught up cursors which include consumers
subscriptions.forEach((subName, subscription) -> {
- if (!subscription.getConsumers().isEmpty()
- && subscription.getCursor().getNumberOfEntries() <
backloggedCursorThresholdEntries) {
- subscription.getCursor().setActive();
- } else {
- subscription.getCursor().setInactive();
- }
+ checkBackloggedCursor(subscription);
});
}
+ private void checkBackloggedCursor(PersistentSubscription subscription) {
+ // activate caught up cursor which include consumers
+ if (!subscription.getConsumers().isEmpty()
+ && subscription.getCursor().getNumberOfEntries() <
backloggedCursorThresholdEntries) {
+ subscription.getCursor().setActive();
+ } else {
+ subscription.getCursor().setInactive();
+ }
+ }
+
public void checkInactiveLedgers() {
ledger.checkInactiveLedgerAndRollOver();
}