Copilot commented on code in PR #25272:
URL: https://github.com/apache/pulsar/pull/25272#discussion_r2878353712


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1885,39 +1854,51 @@ private boolean checkQuotas(Policies policies, 
RetentionPolicies retention) {
         return checkBacklogQuota(quota, retention);
     }
 
-    private void clearBacklog(NamespaceName nsName, String bundleRange, String 
subscription) {
-        try {
-            List<Topic> topicList = 
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
-                    nsName.toString() + "/" + bundleRange);
-
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            if (subscription != null) {
-                if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
-                    subscription = 
PersistentReplicator.getRemoteCluster(subscription);
-                }
-                for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic
-                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
-                        futures.add(((PersistentTopic) 
topic).clearBacklog(subscription));
+    private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName, 
String bundleRange, String subscription) {
+        final NamespaceBundleFactory bundleFactory = 
pulsar().getNamespaceService().getNamespaceBundleFactory();
+        final NamespaceBundle targetBundle = 
bundleFactory.getBundle(nsName.toString(), bundleRange);
+        return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)

Review Comment:
   The `clearBacklogAsync` method calls `getListOfPersistentTopics(nsName)` to 
retrieve all persistent topics in the entire namespace, then filters to only 
those belonging to `targetBundle`. For large namespaces with many topics spread 
across many bundles, this fetches all topic metadata unnecessarily. An 
optimization would be to start with the already-owned topics (from 
`getAllTopicsFromNamespaceBundle`) if the bundle IS owned, and only fall back 
to querying all namespace topics when the bundle is unloaded. The current 
approach is correct but potentially inefficient for large namespaces where the 
bundle is already loaded.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1885,39 +1854,51 @@ private boolean checkQuotas(Policies policies, 
RetentionPolicies retention) {
         return checkBacklogQuota(quota, retention);
     }
 
-    private void clearBacklog(NamespaceName nsName, String bundleRange, String 
subscription) {
-        try {
-            List<Topic> topicList = 
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
-                    nsName.toString() + "/" + bundleRange);
-
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            if (subscription != null) {
-                if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
-                    subscription = 
PersistentReplicator.getRemoteCluster(subscription);
-                }
-                for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic
-                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
-                        futures.add(((PersistentTopic) 
topic).clearBacklog(subscription));
+    private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName, 
String bundleRange, String subscription) {
+        final NamespaceBundleFactory bundleFactory = 
pulsar().getNamespaceService().getNamespaceBundleFactory();
+        final NamespaceBundle targetBundle = 
bundleFactory.getBundle(nsName.toString(), bundleRange);
+        return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)
+                .thenCompose(topicsInNamespace -> {
+                    List<CompletableFuture<Void>> futures = new ArrayList<>();
+                    String effectiveSubscription = subscription;
+                    if (effectiveSubscription != null
+                            && 
effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix()))
 {
+                        effectiveSubscription = 
PersistentReplicator.getRemoteCluster(effectiveSubscription);
                     }
-                }
-            } else {
-                for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic
-                            && 
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
-                        futures.add(((PersistentTopic) topic).clearBacklog());
+                    final String finalSubscription = effectiveSubscription;
+
+                    for (String topic : topicsInNamespace) {
+                        TopicName topicName = TopicName.get(topic);
+                        if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                            continue;
+                        }
+                        NamespaceBundle bundle = 
bundleFactory.getBundle(topicName);
+                        if (bundle == null || !bundle.equals(targetBundle)) {

Review Comment:
   The call `bundleFactory.getBundle(topicName)` at this line uses 
`bundlesCache.synchronous().get(namespace)`, which is a blocking call that 
could block the current thread until the cache is loaded from the metadata 
store. Since this executes inside a `thenCompose` lambda of an async pipeline, 
blocking an async thread could degrade throughput for concurrent admin 
requests. Consider using the async variant 
`bundleFactory.getBundlesAsync(topicName.getNamespaceObject()).thenApply(bundles
 -> bundles.findBundle(topicName))` and restructuring the loop as a stream of 
CompletableFutures to keep the pipeline fully non-blocking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to