zhanghaou commented on code in PR #25272:
URL: https://github.com/apache/pulsar/pull/25272#discussion_r2893591146
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1885,39 +1854,51 @@ private boolean checkQuotas(Policies policies,
RetentionPolicies retention) {
return checkBacklogQuota(quota, retention);
}
- private void clearBacklog(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (subscription != null) {
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- subscription =
PersistentReplicator.getRemoteCluster(subscription);
- }
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic)
topic).clearBacklog(subscription));
+ private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName,
String bundleRange, String subscription) {
+ final NamespaceBundleFactory bundleFactory =
pulsar().getNamespaceService().getNamespaceBundleFactory();
+ final NamespaceBundle targetBundle =
bundleFactory.getBundle(nsName.toString(), bundleRange);
+ return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)
Review Comment:
Fixed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1885,39 +1854,51 @@ private boolean checkQuotas(Policies policies,
RetentionPolicies retention) {
return checkBacklogQuota(quota, retention);
}
- private void clearBacklog(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (subscription != null) {
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- subscription =
PersistentReplicator.getRemoteCluster(subscription);
- }
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic)
topic).clearBacklog(subscription));
+ private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName,
String bundleRange, String subscription) {
+ final NamespaceBundleFactory bundleFactory =
pulsar().getNamespaceService().getNamespaceBundleFactory();
+ final NamespaceBundle targetBundle =
bundleFactory.getBundle(nsName.toString(), bundleRange);
+ return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)
+ .thenCompose(topicsInNamespace -> {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ String effectiveSubscription = subscription;
+ if (effectiveSubscription != null
+ &&
effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix()))
{
+ effectiveSubscription =
PersistentReplicator.getRemoteCluster(effectiveSubscription);
}
- }
- } else {
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic) topic).clearBacklog());
+ final String finalSubscription = effectiveSubscription;
+
+ for (String topic : topicsInNamespace) {
+ TopicName topicName = TopicName.get(topic);
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ continue;
+ }
+ NamespaceBundle bundle =
bundleFactory.getBundle(topicName);
+ if (bundle == null || !bundle.equals(targetBundle)) {
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]