This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f83251a57fd [fix][broker] Support namespace unsubscribe when bundles
are unloaded (#25276)
f83251a57fd is described below
commit f83251a57fd5f4bc20cd47ec7b950d6321d03488
Author: Hao Zhang <[email protected]>
AuthorDate: Fri Mar 6 02:27:14 2026 +0800
[fix][broker] Support namespace unsubscribe when bundles are unloaded
(#25276)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 156 +++++++--------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 29 ++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 29 ++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 215 +++++++++++++++++++++
4 files changed, 328 insertions(+), 101 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bfbd6523481..b9ec275f79c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1667,70 +1667,51 @@ public abstract class NamespacesBase extends
AdminResource {
subscription, namespaceName, bundleRange);
}
- protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse,
String subscription,
- boolean authoritative) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.UNSUBSCRIBE);
+ protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String
subscription,
+
boolean authoritative) {
checkNotNull(subscription, "Subscription should not be null");
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- try {
- NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
- for (NamespaceBundle nsBundle : bundles.getBundles()) {
- // check if the bundle is owned by any broker, if not then
there are no subscriptions
- if
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
-
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
- namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
- }
- }
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- log.warn("[{}] Failed to unsubscribe {} on the bundles for
namespace {}: {}", clientAppId(),
- subscription, namespaceName,
exception.getCause().getMessage());
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
- return null;
- } else {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return null;
- }
- }
- log.info("[{}] Successfully unsubscribed {} on all the bundles for
namespace {}", clientAppId(),
- subscription, namespaceName);
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.UNSUBSCRIBE)
+ .thenCompose(__ ->
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundlesAsync(namespaceName))
+ .thenCompose(bundles -> {
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>();
+ for (NamespaceBundle nsBundle : bundles.getBundles()) {
+ try {
+
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
+ namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures);
+ }).thenRun(() -> log.info("[{}] Successfully unsubscribed {}
on all the bundles for namespace {}",
+ clientAppId(), subscription, namespaceName));
}
@SuppressWarnings("deprecation")
- protected void internalUnsubscribeNamespaceBundle(String subscription,
String bundleRange, boolean authoritative) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.UNSUBSCRIBE);
+ protected CompletableFuture<Void>
internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange,
+
boolean authoritative) {
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");
- Policies policies = getNamespacePolicies(namespaceName);
-
- if (namespaceName.isGlobal()) {
- // check cluster ownership for a given global namespace: redirect
if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
- } else {
- validateClusterOwnership(namespaceName.getCluster());
- validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster());
- }
-
- validateNamespaceBundleOwnership(namespaceName, policies.bundles,
bundleRange, authoritative, true);
-
- unsubscribe(namespaceName, bundleRange, subscription);
- log.info("[{}] Successfully unsubscribed {} on namespace bundle
{}/{}", clientAppId(), subscription,
- namespaceName, bundleRange);
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.UNSUBSCRIBE)
+ .thenCompose(__ -> {
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global
namespace: redirect if peer-cluster owns it
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return
validateClusterOwnershipAsync(namespaceName.getCluster())
+ .thenCompose(unused ->
validateClusterForTenantAsync(namespaceName.getTenant(),
+ namespaceName.getCluster()));
+ })
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies ->
+ validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ authoritative, false))
+ .thenCompose(bundle -> unsubscribeAsync(bundle, subscription))
+ .thenRun(() -> log.info("[{}] Successfully unsubscribed {} on
namespace bundle {}/{}",
+ clientAppId(), subscription, namespaceName,
bundleRange));
}
protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode
subscriptionAuthMode) {
@@ -1918,32 +1899,45 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- private void unsubscribe(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- throw new RestException(Status.PRECONDITION_FAILED, "Cannot
unsubscribe a replication cursor");
- } else {
- for (Topic topic : topicList) {
- Subscription sub = topic.getSubscription(subscription);
- if (sub != null) {
- futures.add(sub.delete());
- }
- }
- }
- FutureUtil.waitForAll(futures).get();
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to unsubscribe {} for namespace {}/{}",
clientAppId(), subscription,
- nsName.toString(), bundleRange, e);
- if (e.getCause() instanceof SubscriptionBusyException) {
- throw new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers");
- }
- throw new RestException(e.getCause());
+ private CompletableFuture<Void> unsubscribeAsync(NamespaceBundle bundle,
String subscription) {
+ if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
+ return CompletableFuture.failedFuture(
+ new RestException(Status.PRECONDITION_FAILED, "Cannot
unsubscribe a replication cursor"));
}
+
+ return
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
+ .thenCompose(topicsInBundle -> {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topic : topicsInBundle) {
+ TopicName topicName = TopicName.get(topic);
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ continue;
+ }
+
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
+ .thenCompose(optTopic -> {
+ if (optTopic.isEmpty()) {
+ return
CompletableFuture.completedFuture(null);
+ }
+ Topic loaded = optTopic.get();
+ Subscription sub =
loaded.getSubscription(subscription);
+ if (sub == null) {
+ return
CompletableFuture.completedFuture(null);
+ }
+ return sub.delete();
+ }));
+ }
+ return FutureUtil.waitForAll(futures);
+ }).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof RestException) {
+ throw (RestException) cause;
+ }
+ if (cause instanceof SubscriptionBusyException) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers");
+ }
+ throw new RestException(cause);
+ });
}
protected BundlesData validateBundlesData(BundlesData initialBundles) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index cad6899c8a2..1809456476c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1446,14 +1446,15 @@ public class Namespaces extends NamespacesBase {
@PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateNamespaceName(property, cluster, namespace);
- internalUnsubscribeNamespace(asyncResponse, subscription,
authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalUnsubscribeNamespaceAsync(subscription, authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to unsubscribe {} on namespace {}",
clientAppId(),
+ subscription, namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1462,12 +1463,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate
permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void unsubscribeNamespaceBundle(@PathParam("property") String
property, @PathParam("cluster") String cluster,
+ public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateNamespaceName(property, cluster, namespace);
- internalUnsubscribeNamespaceBundle(subscription, bundleRange,
authoritative);
+ internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to unsubscribe {} on namespace
bundle {}/{}", clientAppId(),
+ subscription, namespaceName, bundleRange, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 90f4b087bfe..53501d16f48 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1588,14 +1588,15 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateNamespaceName(tenant, namespace);
- internalUnsubscribeNamespace(asyncResponse, subscription,
authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalUnsubscribeNamespaceAsync(subscription, authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to unsubscribe {} on namespace {}",
clientAppId(),
+ subscription, namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -1605,12 +1606,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin or operate
permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void unsubscribeNamespaceBundle(@PathParam("tenant") String tenant,
+ public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateNamespaceName(tenant, namespace);
- internalUnsubscribeNamespaceBundle(subscription, bundleRange,
authoritative);
+ internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to unsubscribe {} on namespace
bundle {}/{}", clientAppId(),
+ subscription, namespaceName, bundleRange, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 663a6f0a419..8dabc75e52d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2339,6 +2339,221 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
new ArrayList<>());
}
+ @Test(dataProvider = "numBundles")
+ public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer
numBundles) throws Exception {
+ String namespace = "prop-xyz/ns-unsub-bundle";
+ admin.namespaces().createNamespace(namespace, numBundles);
+
+ String topic = "persistent://" + namespace + "/t1";
+ String subscription = "sub1";
+ String otherSubscription = "sub2";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+
+ consumer1.close();
+ consumer2.close();
+
+ NamespaceBundle bundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle.getBundleRange());
+
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by current broker");
+
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by other broker");
+ });
+
+ admin.namespaces().unsubscribeNamespaceBundle(namespace,
bundle.getBundleRange(), subscription);
+
+ List<String> subscriptions =
+
admin.topics().getSubscriptions(topic).stream().sorted().toList();
+ assertEquals(subscriptions, List.of(otherSubscription));
+ }
+
+ @Test(dataProvider = "numBundles")
+ public void testUnsubscribeNamespaceOnUnloadedBundle(Integer numBundles)
throws Exception {
+ String namespace = "prop-xyz/ns-unsub-namespace";
+ admin.namespaces().createNamespace(namespace, numBundles);
+
+ String topic = "persistent://" + namespace + "/t1";
+ String subscription = "sub1";
+ String otherSubscription = "sub2";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+
+ consumer1.close();
+ consumer2.close();
+
+ NamespaceBundle bundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle.getBundleRange());
+
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by current broker");
+
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by other broker");
+ });
+
+ admin.namespaces().unsubscribeNamespace(namespace, subscription);
+
+ List<String> subscriptions =
+
admin.topics().getSubscriptions(topic).stream().sorted().toList();
+ assertEquals(subscriptions, List.of(otherSubscription));
+ }
+
+ @Test(dataProvider = "numBundles")
+ public void
testUnsubscribeNamespaceOnUnloadedBundleWithPartitionedTopic(Integer
numBundles) throws Exception {
+ String namespace = "prop-xyz/ns-unsub-partitioned";
+ admin.namespaces().createNamespace(namespace, numBundles);
+
+ String topic = "persistent://" + namespace + "/pt";
+ int partitions = 3;
+ admin.topics().createPartitionedTopic(topic, partitions);
+
+ String subscription = "sub1";
+ String otherSubscription = "sub2";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+
+ consumer1.close();
+ consumer2.close();
+
+ Set<NamespaceBundle> bundles = new HashSet<>();
+ for (int i = 0; i < partitions; i++) {
+ String partitionTopic =
TopicName.get(topic).getPartition(i).toString();
+
bundles.add(pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(TopicName.get(partitionTopic)));
+ }
+ for (NamespaceBundle bundle : bundles) {
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle.getBundleRange());
+ }
+
+ for (NamespaceBundle bundle : bundles) {
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by current
broker");
+
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by other broker");
+ });
+ }
+
+ admin.namespaces().unsubscribeNamespace(namespace, subscription);
+
+ for (int i = 0; i < partitions; i++) {
+ String partitionTopic =
TopicName.get(topic).getPartition(i).toString();
+ List<String> subs =
admin.topics().getSubscriptions(partitionTopic).stream().sorted().toList();
+ assertEquals(subs, List.of(otherSubscription));
+ }
+ }
+
+ @Test
+ public void
testUnsubscribeNamespaceOnUnloadedBundleWithMultiTopicCrossBundle() throws
Exception {
+ String namespace = "prop-xyz/ns-unsub-cross-bundle";
+ admin.namespaces().createNamespace(namespace, 4);
+
+ String subscription = "sub1";
+ String otherSubscription = "sub2";
+
+ String topic1 = null;
+ String topic2 = null;
+ NamespaceBundle bundle1 = null;
+ NamespaceBundle bundle2 = null;
+ for (int i = 0; i < 50; i++) {
+ String candidate = "persistent://" + namespace + "/t-" + i;
+ @Cleanup
+ Consumer<byte[]> c = pulsarClient.newConsumer()
+ .topic(candidate)
+ .subscriptionName(subscription)
+ .subscribe();
+ c.close();
+ NamespaceBundle bundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(candidate));
+ if (topic1 == null) {
+ topic1 = candidate;
+ bundle1 = bundle;
+ continue;
+ }
+ if (!bundle.equals(bundle1)) {
+ topic2 = candidate;
+ bundle2 = bundle;
+ break;
+ }
+ }
+ assertNotNull(topic1);
+ assertNotNull(topic2);
+ assertNotNull(bundle1);
+ assertNotNull(bundle2);
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic1)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic2)
+ .subscriptionName(otherSubscription)
+ .subscribe();
+ consumer1.close();
+ consumer2.close();
+
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle1.getBundleRange());
+ admin.namespaces().unloadNamespaceBundle(namespace,
bundle2.getBundleRange());
+
+ for (NamespaceBundle bundle : List.of(bundle1, bundle2)) {
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by current
broker");
+
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle),
+ "Bundle should not be owned by other broker");
+ });
+ }
+
+ admin.namespaces().unsubscribeNamespace(namespace, subscription);
+
+ List<String> subs1 =
admin.topics().getSubscriptions(topic1).stream().sorted().toList();
+ List<String> subs2 =
admin.topics().getSubscriptions(topic2).stream().sorted().toList();
+ assertEquals(subs1, List.of(otherSubscription));
+ assertEquals(subs2, List.of(otherSubscription));
+ }
+
private List<MessageId> publishMessagesOnPersistentTopic(String topicName,
int messages) throws Exception {
return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
}