This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e40719f844cf51c0d0c8eb2e330672dc26214c1d
Author: lipenghui <[email protected]>
AuthorDate: Mon Aug 3 19:02:52 2020 +0800

    Close the previous reader of the healthcheck topic (#7724)
    
    
    (cherry picked from commit ca98a89c62179c710b9190b3b1fd8e04e3b597c4)
---
 .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 072e91c..e78427c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -45,6 +46,8 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -261,9 +264,11 @@ public class BrokersBase extends AdminResource {
         PulsarClient client = pulsar().getClient();
 
         String messageStr = UUID.randomUUID().toString();
-        // create non-partitioned topic manually
+        // create non-partitioned topic manually and close the previous reader 
if present.
         try {
-            pulsar().getBrokerService().getTopic(topic, true).get();
+            pulsar().getBrokerService().getTopic(topic, 
true).get().ifPresent(t -> {
+                
t.getSubscriptions().values().forEach(Subscription::deleteForcefully);
+            });
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
             return;

Reply via email to