This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e40719f844cf51c0d0c8eb2e330672dc26214c1d Author: lipenghui <[email protected]> AuthorDate: Mon Aug 3 19:02:52 2020 +0800 Close the previous reader of the healthcheck topic (#7724) (cherry picked from commit ca98a89c62179c710b9190b3b1fd8e04e3b597c4) --- .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 072e91c..e78427c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -45,6 +46,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -261,9 +264,11 @@ public class BrokersBase extends AdminResource { PulsarClient client = pulsar().getClient(); String messageStr = UUID.randomUUID().toString(); - // create non-partitioned topic manually + // create non-partitioned topic manually and close the previous reader if present. try { - pulsar().getBrokerService().getTopic(topic, true).get(); + pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> { + t.getSubscriptions().values().forEach(Subscription::deleteForcefully); + }); } catch (Exception e) { asyncResponse.resume(new RestException(e)); return;
