Denovo1998 commented on code in PR #25276:
URL: https://github.com/apache/pulsar/pull/25276#discussion_r2878093572
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1918,32 +1899,50 @@ private void clearBacklog(NamespaceName nsName, String
bundleRange, String subsc
}
}
- private void unsubscribe(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- throw new RestException(Status.PRECONDITION_FAILED, "Cannot
unsubscribe a replication cursor");
- } else {
- for (Topic topic : topicList) {
- Subscription sub = topic.getSubscription(subscription);
- if (sub != null) {
- futures.add(sub.delete());
- }
- }
- }
- FutureUtil.waitForAll(futures).get();
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to unsubscribe {} for namespace {}/{}",
clientAppId(), subscription,
- nsName.toString(), bundleRange, e);
- if (e.getCause() instanceof SubscriptionBusyException) {
- throw new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers");
- }
- throw new RestException(e.getCause());
+ private CompletableFuture<Void> unsubscribeAsync(NamespaceName nsName,
String bundleRange, String subscription) {
+ if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
+ return CompletableFuture.failedFuture(
+ new RestException(Status.PRECONDITION_FAILED, "Cannot
unsubscribe a replication cursor"));
}
+
+ return pulsar().getNamespaceService().getFullListOfTopics(nsName)
+ .thenCompose(topicsInNamespace -> {
Review Comment:
Can the logic of NamespaceService#getOwnedTopicListForNamespaceBundle be
reused here (or switch to using targetBundle.includes)?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java:
##########
@@ -2339,6 +2339,92 @@ public void testUnsubscribeOnNamespace(Integer
numBundles) throws Exception {
new ArrayList<>());
}
+ @Test(dataProvider = "numBundles")
+ public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer
numBundles) throws Exception {
Review Comment:
Perhaps we can add some unit tests.
1) partitioned topic (ensure all subscriptions are deleted on each
partition);
2) Multi-topic cross-bundle;
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]