poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r958321455


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated 
with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client 
disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all 
producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all 
resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", 
topic);
+                    return CompletableFuture.failedFuture(new 
TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", 
topic);
-                closeFuture.completeExceptionally(new 
TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();

Review Comment:
   The code in line:1294~line:1298 handles the logic 
`closeWithoutWaitingClientDisconnect`: 
   
   ```java
   CompletableFuture<Void> closePhase2Future;
   if (closeWithoutWaitingClientDisconnect){
       closePhase2Future = asyncCloseLedger();
   } else {
       closePhase2Future = closeClientsFuture.thenCompose(__ -> 
asyncCloseLedger());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to