This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f83251a57fd [fix][broker] Support namespace unsubscribe when bundles 
are unloaded (#25276)
f83251a57fd is described below

commit f83251a57fd5f4bc20cd47ec7b950d6321d03488
Author: Hao Zhang <[email protected]>
AuthorDate: Fri Mar 6 02:27:14 2026 +0800

    [fix][broker] Support namespace unsubscribe when bundles are unloaded 
(#25276)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 156 +++++++--------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  29 ++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  29 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 215 +++++++++++++++++++++
 4 files changed, 328 insertions(+), 101 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bfbd6523481..b9ec275f79c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1667,70 +1667,51 @@ public abstract class NamespacesBase extends 
AdminResource {
                 subscription, namespaceName, bundleRange);
     }
 
-    protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, 
String subscription,
-                                                boolean authoritative) {
-        validateNamespaceOperation(namespaceName, 
NamespaceOperation.UNSUBSCRIBE);
+    protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String 
subscription,
+                                                                        
boolean authoritative) {
         checkNotNull(subscription, "Subscription should not be null");
 
-        final List<CompletableFuture<Void>> futures = new ArrayList<>();
-        try {
-            NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                    .getBundles(namespaceName);
-            for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                // check if the bundle is owned by any broker, if not then 
there are no subscriptions
-                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
-                    
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
-                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
-                }
-            }
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-            return;
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-            return;
-        }
-
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                log.warn("[{}] Failed to unsubscribe {} on the bundles for 
namespace {}: {}", clientAppId(),
-                        subscription, namespaceName, 
exception.getCause().getMessage());
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                    return null;
-                } else {
-                    asyncResponse.resume(new 
RestException(exception.getCause()));
-                    return null;
-                }
-            }
-            log.info("[{}] Successfully unsubscribed {} on all the bundles for 
namespace {}", clientAppId(),
-                    subscription, namespaceName);
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        });
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
+                .thenCompose(__ -> 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                        .getBundlesAsync(namespaceName))
+                .thenCompose(bundles -> {
+                    final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
+                    for (NamespaceBundle nsBundle : bundles.getBundles()) {
+                        try {
+                            
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
+                                    namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
+                        } catch (PulsarServerException e) {
+                            return CompletableFuture.failedFuture(e);
+                        }
+                    }
+                    return FutureUtil.waitForAll(futures);
+                }).thenRun(() -> log.info("[{}] Successfully unsubscribed {} 
on all the bundles for namespace {}",
+                        clientAppId(), subscription, namespaceName));
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalUnsubscribeNamespaceBundle(String subscription, 
String bundleRange, boolean authoritative) {
-        validateNamespaceOperation(namespaceName, 
NamespaceOperation.UNSUBSCRIBE);
+    protected CompletableFuture<Void> 
internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange,
+                                                                              
boolean authoritative) {
         checkNotNull(subscription, "Subscription should not be null");
         checkNotNull(bundleRange, "BundleRange should not be null");
 
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        if (namespaceName.isGlobal()) {
-            // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
-            validateGlobalNamespaceOwnership(namespaceName);
-        } else {
-            validateClusterOwnership(namespaceName.getCluster());
-            validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
-        }
-
-        validateNamespaceBundleOwnership(namespaceName, policies.bundles, 
bundleRange, authoritative, true);
-
-        unsubscribe(namespaceName, bundleRange, subscription);
-        log.info("[{}] Successfully unsubscribed {} on namespace bundle 
{}/{}", clientAppId(), subscription,
-                namespaceName, bundleRange);
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
+                .thenCompose(__ -> {
+                    if (namespaceName.isGlobal()) {
+                        // check cluster ownership for a given global 
namespace: redirect if peer-cluster owns it
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    }
+                    return 
validateClusterOwnershipAsync(namespaceName.getCluster())
+                            .thenCompose(unused -> 
validateClusterForTenantAsync(namespaceName.getTenant(),
+                                    namespaceName.getCluster()));
+                })
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies ->
+                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
+                                authoritative, false))
+                .thenCompose(bundle -> unsubscribeAsync(bundle, subscription))
+                .thenRun(() -> log.info("[{}] Successfully unsubscribed {} on 
namespace bundle {}/{}",
+                        clientAppId(), subscription, namespaceName, 
bundleRange));
     }
 
     protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode 
subscriptionAuthMode) {
@@ -1918,32 +1899,45 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    private void unsubscribe(NamespaceName nsName, String bundleRange, String 
subscription) {
-        try {
-            List<Topic> topicList = 
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
-                    nsName.toString() + "/" + bundleRange);
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
-                throw new RestException(Status.PRECONDITION_FAILED, "Cannot 
unsubscribe a replication cursor");
-            } else {
-                for (Topic topic : topicList) {
-                    Subscription sub = topic.getSubscription(subscription);
-                    if (sub != null) {
-                        futures.add(sub.delete());
-                    }
-                }
-            }
-            FutureUtil.waitForAll(futures).get();
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", 
clientAppId(), subscription,
-                    nsName.toString(), bundleRange, e);
-            if (e.getCause() instanceof SubscriptionBusyException) {
-                throw new RestException(Status.PRECONDITION_FAILED, 
"Subscription has active connected consumers");
-            }
-            throw new RestException(e.getCause());
+    private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle, 
String subscription) {
+        if 
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
+            return CompletableFuture.failedFuture(
+                    new RestException(Status.PRECONDITION_FAILED, "Cannot 
unsubscribe a replication cursor"));
         }
+
+        return 
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
+                .thenCompose(topicsInBundle -> {
+                    List<CompletableFuture<Void>> futures = new ArrayList<>();
+                    for (String topic : topicsInBundle) {
+                        TopicName topicName = TopicName.get(topic);
+                        if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                            continue;
+                        }
+                        
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
+                                .thenCompose(optTopic -> {
+                                    if (optTopic.isEmpty()) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                    Topic loaded = optTopic.get();
+                                    Subscription sub = 
loaded.getSubscription(subscription);
+                                    if (sub == null) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                    return sub.delete();
+                                }));
+                    }
+                    return FutureUtil.waitForAll(futures);
+                }).exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof RestException) {
+                        throw (RestException) cause;
+                    }
+                    if (cause instanceof SubscriptionBusyException) {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Subscription has active connected consumers");
+                    }
+                    throw new RestException(cause);
+                });
     }
 
     protected BundlesData validateBundlesData(BundlesData initialBundles) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index cad6899c8a2..1809456476c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1446,14 +1446,15 @@ public class Namespaces extends NamespacesBase {
             @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            internalUnsubscribeNamespace(asyncResponse, subscription, 
authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalUnsubscribeNamespaceAsync(subscription, authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to unsubscribe {} on namespace {}", 
clientAppId(),
+                            subscription, namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1462,12 +1463,20 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void unsubscribeNamespaceBundle(@PathParam("property") String 
property, @PathParam("cluster") String cluster,
+    public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(property, cluster, namespace);
-        internalUnsubscribeNamespaceBundle(subscription, bundleRange, 
authoritative);
+        internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, 
authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to unsubscribe {} on namespace 
bundle {}/{}", clientAppId(),
+                            subscription, namespaceName, bundleRange, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 90f4b087bfe..53501d16f48 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1588,14 +1588,15 @@ public class Namespaces extends NamespacesBase {
             @PathParam("namespace") String namespace,
             @PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalUnsubscribeNamespace(asyncResponse, subscription, 
authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalUnsubscribeNamespaceAsync(subscription, authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to unsubscribe {} on namespace {}", 
clientAppId(),
+                            subscription, namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1605,12 +1606,20 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 204, message = "Operation successful"),
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void unsubscribeNamespaceBundle(@PathParam("tenant") String tenant,
+    public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalUnsubscribeNamespaceBundle(subscription, bundleRange, 
authoritative);
+        internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, 
authoritative)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to unsubscribe {} on namespace 
bundle {}/{}", clientAppId(),
+                            subscription, namespaceName, bundleRange, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 663a6f0a419..8dabc75e52d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2339,6 +2339,221 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 new ArrayList<>());
     }
 
+    @Test(dataProvider = "numBundles")
+    public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer 
numBundles) throws Exception {
+        String namespace = "prop-xyz/ns-unsub-bundle";
+        admin.namespaces().createNamespace(namespace, numBundles);
+
+        String topic = "persistent://" + namespace + "/t1";
+        String subscription = "sub1";
+        String otherSubscription = "sub2";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+
+        consumer1.close();
+        consumer2.close();
+
+        NamespaceBundle bundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
+
+        Awaitility.await()
+                .atMost(30, TimeUnit.SECONDS)
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by current broker");
+                    
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by other broker");
+                });
+
+        admin.namespaces().unsubscribeNamespaceBundle(namespace, 
bundle.getBundleRange(), subscription);
+
+        List<String> subscriptions =
+                
admin.topics().getSubscriptions(topic).stream().sorted().toList();
+        assertEquals(subscriptions, List.of(otherSubscription));
+    }
+
+    @Test(dataProvider = "numBundles")
+    public void testUnsubscribeNamespaceOnUnloadedBundle(Integer numBundles) 
throws Exception {
+        String namespace = "prop-xyz/ns-unsub-namespace";
+        admin.namespaces().createNamespace(namespace, numBundles);
+
+        String topic = "persistent://" + namespace + "/t1";
+        String subscription = "sub1";
+        String otherSubscription = "sub2";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+
+        consumer1.close();
+        consumer2.close();
+
+        NamespaceBundle bundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
+
+        Awaitility.await()
+                .atMost(30, TimeUnit.SECONDS)
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by current broker");
+                    
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                            "Bundle should not be owned by other broker");
+                });
+
+        admin.namespaces().unsubscribeNamespace(namespace, subscription);
+
+        List<String> subscriptions =
+                
admin.topics().getSubscriptions(topic).stream().sorted().toList();
+        assertEquals(subscriptions, List.of(otherSubscription));
+    }
+
+    @Test(dataProvider = "numBundles")
+    public void 
testUnsubscribeNamespaceOnUnloadedBundleWithPartitionedTopic(Integer 
numBundles) throws Exception {
+        String namespace = "prop-xyz/ns-unsub-partitioned";
+        admin.namespaces().createNamespace(namespace, numBundles);
+
+        String topic = "persistent://" + namespace + "/pt";
+        int partitions = 3;
+        admin.topics().createPartitionedTopic(topic, partitions);
+
+        String subscription = "sub1";
+        String otherSubscription = "sub2";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+
+        consumer1.close();
+        consumer2.close();
+
+        Set<NamespaceBundle> bundles = new HashSet<>();
+        for (int i = 0; i < partitions; i++) {
+            String partitionTopic = 
TopicName.get(topic).getPartition(i).toString();
+            
bundles.add(pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(TopicName.get(partitionTopic)));
+        }
+        for (NamespaceBundle bundle : bundles) {
+            admin.namespaces().unloadNamespaceBundle(namespace, 
bundle.getBundleRange());
+        }
+
+        for (NamespaceBundle bundle : bundles) {
+            Awaitility.await()
+                    .atMost(30, TimeUnit.SECONDS)
+                    .pollInterval(200, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> {
+                        
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                                "Bundle should not be owned by current 
broker");
+                        
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                                "Bundle should not be owned by other broker");
+                    });
+        }
+
+        admin.namespaces().unsubscribeNamespace(namespace, subscription);
+
+        for (int i = 0; i < partitions; i++) {
+            String partitionTopic = 
TopicName.get(topic).getPartition(i).toString();
+            List<String> subs = 
admin.topics().getSubscriptions(partitionTopic).stream().sorted().toList();
+            assertEquals(subs, List.of(otherSubscription));
+        }
+    }
+
+    @Test
+    public void 
testUnsubscribeNamespaceOnUnloadedBundleWithMultiTopicCrossBundle() throws 
Exception {
+        String namespace = "prop-xyz/ns-unsub-cross-bundle";
+        admin.namespaces().createNamespace(namespace, 4);
+
+        String subscription = "sub1";
+        String otherSubscription = "sub2";
+
+        String topic1 = null;
+        String topic2 = null;
+        NamespaceBundle bundle1 = null;
+        NamespaceBundle bundle2 = null;
+        for (int i = 0; i < 50; i++) {
+            String candidate = "persistent://" + namespace + "/t-" + i;
+            @Cleanup
+            Consumer<byte[]> c = pulsarClient.newConsumer()
+                    .topic(candidate)
+                    .subscriptionName(subscription)
+                    .subscribe();
+            c.close();
+            NamespaceBundle bundle =
+                    
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(candidate));
+            if (topic1 == null) {
+                topic1 = candidate;
+                bundle1 = bundle;
+                continue;
+            }
+            if (!bundle.equals(bundle1)) {
+                topic2 = candidate;
+                bundle2 = bundle;
+                break;
+            }
+        }
+        assertNotNull(topic1);
+        assertNotNull(topic2);
+        assertNotNull(bundle1);
+        assertNotNull(bundle2);
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic1)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic2)
+                .subscriptionName(otherSubscription)
+                .subscribe();
+        consumer1.close();
+        consumer2.close();
+
+        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle1.getBundleRange());
+        admin.namespaces().unloadNamespaceBundle(namespace, 
bundle2.getBundleRange());
+
+        for (NamespaceBundle bundle : List.of(bundle1, bundle2)) {
+            Awaitility.await()
+                    .atMost(30, TimeUnit.SECONDS)
+                    .pollInterval(200, TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> {
+                        
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                                "Bundle should not be owned by current 
broker");
+                        
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+                                "Bundle should not be owned by other broker");
+                    });
+        }
+
+        admin.namespaces().unsubscribeNamespace(namespace, subscription);
+
+        List<String> subs1 = 
admin.topics().getSubscriptions(topic1).stream().sorted().toList();
+        List<String> subs2 = 
admin.topics().getSubscriptions(topic2).stream().sorted().toList();
+        assertEquals(subs1, List.of(otherSubscription));
+        assertEquals(subs2, List.of(otherSubscription));
+    }
+
     private List<MessageId> publishMessagesOnPersistentTopic(String topicName, 
int messages) throws Exception {
         return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
     }

Reply via email to