nodece commented on code in PR #25276:
URL: https://github.com/apache/pulsar/pull/25276#discussion_r2870314702


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1918,32 +1899,52 @@ private void clearBacklog(NamespaceName nsName, String 
bundleRange, String subsc
         }
     }
 
-    private void unsubscribe(NamespaceName nsName, String bundleRange, String 
subscription) {
-        try {
-            List<Topic> topicList = 
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
-                    nsName.toString() + "/" + bundleRange);
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
-                throw new RestException(Status.PRECONDITION_FAILED, "Cannot 
unsubscribe a replication cursor");
-            } else {
-                for (Topic topic : topicList) {
-                    Subscription sub = topic.getSubscription(subscription);
-                    if (sub != null) {
-                        futures.add(sub.delete());
-                    }
-                }
-            }
-            FutureUtil.waitForAll(futures).get();
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", 
clientAppId(), subscription,
-                    nsName.toString(), bundleRange, e);
-            if (e.getCause() instanceof SubscriptionBusyException) {
-                throw new RestException(Status.PRECONDITION_FAILED, 
"Subscription has active connected consumers");
-            }
-            throw new RestException(e.getCause());
+    private CompletableFuture<Void> unsubscribeAsync(NamespaceName nsName, 
String bundleRange, String subscription) {
+        if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
+            return CompletableFuture.failedFuture(
+                    new RestException(Status.PRECONDITION_FAILED, "Cannot 
unsubscribe a replication cursor"));
         }
+
+        return pulsar().getNamespaceService().getFullListOfTopics(nsName)
+                .thenCompose(topicsInNamespace -> {
+                    List<CompletableFuture<Void>> futures = new ArrayList<>();
+                    NamespaceBundleFactory bundleFactory =
+                            
pulsar().getNamespaceService().getNamespaceBundleFactory();
+                    NamespaceBundle targetBundle = 
bundleFactory.getBundle(nsName.toString(), bundleRange);
+
+                    for (String topic : topicsInNamespace) {
+                        TopicName topicName = TopicName.get(topic);
+                        if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                            continue;
+                        }
+                        NamespaceBundle bundle = 
bundleFactory.getBundle(topicName);
+                        if (bundle == null || !bundle.equals(targetBundle)) {
+                            continue;
+                        }
+                        
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
+                                .thenCompose(optTopic -> {
+                                    if (optTopic.isEmpty()) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                    Topic loaded = optTopic.get();
+                                    Subscription sub = 
loaded.getSubscription(subscription);
+                                    if (sub == null) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                    return sub.delete();
+                                }));
+                    }
+                    return FutureUtil.waitForAll(futures);
+                }).exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to unsubscribe {} for namespace 
{}/{}", clientAppId(), subscription,

Review Comment:
   The same error logging block is repeated in the base class and both 
versioned implementations (v1 and v2).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to