This is an automated email from the ASF dual-hosted git repository. crossoverjie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7b5150ab83d [improve][broker]clean up duplicate code and fix typo (#24069) 7b5150ab83d is described below commit 7b5150ab83dc74ea915c0f97a30971df63e65b15 Author: 3pacccccc <36988878+3paccc...@users.noreply.github.com> AuthorDate: Mon Mar 17 14:01:12 2025 +0800 [improve][broker]clean up duplicate code and fix typo (#24069) Co-authored-by: maruimin <202304...@any3.com> Co-authored-by: crossoverJie <crossover...@gmail.com> --- .../main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java | 6 +++--- .../main/java/org/apache/pulsar/broker/service/AbstractTopic.java | 8 ++++++++ .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 6 +----- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 6 +----- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java index cdedac1136e..deae1d93a31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java @@ -39,8 +39,8 @@ public class OwnedBundle { private final NamespaceBundle bundle; /** - * {@link #nsLock} is used to protect read/write access to {@link #active} flag and the corresponding code section - * based on {@link #active} flag. + * {@link #nsLock} is used to protect read/write access to {@link #isActive} flag and the corresponding code section + * based on {@link #isActive} flag. */ @ToString.Exclude @EqualsAndHashCode.Exclude @@ -104,7 +104,7 @@ public class OwnedBundle { public CompletableFuture<Void> handleUnloadRequest(PulsarService pulsar, long timeout, TimeUnit timeoutUnit, boolean closeWithoutWaitingClientDisconnect) { long unloadBundleStartTime = System.nanoTime(); - // Need a per namespace RenetrantReadWriteLock + // Need a per namespace ReentrantReadWriteLock // Here to do a writeLock to set the flag and proceed to check and close connections try { while (!this.nsLock.writeLock().tryLock(1, TimeUnit.SECONDS)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f6397f21962..714cb7a0c47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -599,6 +599,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { return count; } + protected int getNumberOfConsumers(final Collection<? extends Subscription> subscriptions) { + int count = 0; + for (Subscription subscription : subscriptions) { + count += subscription.getConsumers().size(); + } + return count; + } + protected CompletableFuture<Void> addConsumerToSubscription(Subscription subscription, Consumer consumer) { if (isConsumersExceededOnTopic()) { log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1931a09497e..c4f1379f437 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -691,11 +691,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol @Override public int getNumberOfConsumers() { - int count = 0; - for (NonPersistentSubscription subscription : subscriptions.values()) { - count += subscription.getConsumers().size(); - } - return count; + return getNumberOfConsumers(subscriptions.values()); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2c6e2dec3ff..704cf2b393c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2263,11 +2263,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public int getNumberOfConsumers() { - int count = 0; - for (PersistentSubscription subscription : subscriptions.values()) { - count += subscription.getConsumers().size(); - } - return count; + return getNumberOfConsumers(subscriptions.values()); } @Override