codelipenghui commented on code in PR #18283:
URL: https://github.com/apache/pulsar/pull/18283#discussion_r1011247315
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1179,30 +1191,24 @@ private CompletableFuture<Void> delete(boolean
failIfHasSubscriptions,
CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
CompletableFuture<Void> closeClientFuture = new
CompletableFuture<>();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect()));
if (closeIfClientsConnected) {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) ->
futures.add(replicator.disconnect()));
producers.values().forEach(producer ->
futures.add(producer.disconnect()));
- subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect()));
- FutureUtil.waitForAll(futures).thenRun(() -> {
- closeClientFuture.complete(null);
- }).exceptionally(ex -> {
- log.error("[{}] Error closing clients", topic, ex);
- unfenceTopicToResume();
- closeClientFuture.completeExceptionally(ex);
- return null;
- });
- } else {
- closeClientFuture.complete(null);
}
+ FutureUtil.waitForAll(futures).thenRun(() -> {
+ closeClientFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}] Error closing clients", topic, ex);
+ unfenceTopicToResume();
+ closeClientFuture.completeExceptionally(ex);
+ return null;
+ });
closeClientFuture.thenAccept(delete -> {
- // We can proceed with the deletion if either:
- // 1. No one is connected
- // 2. We want to kick out everyone and forcefully delete
the topic.
- // In this case, we shouldn't care if the usageCount
is 0 or not, just proceed
- if (currentUsageCount() == 0 || (closeIfClientsConnected
&& !failIfHasSubscriptions)) {
+ if (currentUsageCount() == 0) {
Review Comment:
Yes, you are right @poorbarcode
The `closeIfClientsConnected` and `failIfHasSubscriptions` is validated
before we reach here. It looks like duplicated validation.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1170,6 +1179,9 @@ private CompletableFuture<Void> delete(boolean
failIfHasSubscriptions,
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did
not catch up: " + backlogSubs));
+ } else if (!closeIfClientsConnected && currentUsageCount() != 0 &&
!failIfHasBacklogs) {
+ return FutureUtil.failedFuture(new TopicBusyException(
+ "Topic has " + currentUsageCount() + " connected
producers/consumers"));
Review Comment:
@poorbarcode We have write lock here
https://github.com/apache/pulsar/pull/18283/files#diff-5edf14cc6f25857d0cfdd26b2d3b3141230ecfb0dfa95aebf7583fd76ede4c4bR1161
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]