heesung-sn commented on code in PR #21682:
URL: https://github.com/apache/pulsar/pull/21682#discussion_r1419600767


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java:
##########
@@ -147,12 +151,13 @@ public synchronized void consumerFlow(Consumer consumer, 
int additionalNumberOfM
     }
 
     @Override
-    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean 
isResetCursor) {
+    public synchronized CompletableFuture<Void> disconnectAllConsumers(
+            boolean isResetCursor, Optional<BrokerLookupData> 
assignedBrokerLookupData) {
         closeFuture = new CompletableFuture<>();
         if (consumerList.isEmpty()) {
             closeFuture.complete(null);
         } else {
-            consumerList.forEach(Consumer::disconnect);

Review Comment:
   nit: it seems odd that previously, we didn't pass `isResetCursor` to the 
`consumer.disconnect.` Now, we are passing it.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -889,53 +890,79 @@ public int getTotalNonContiguousDeletedMessagesRange() {
      *
      * @return CompletableFuture indicating the completion of delete operation
      */
-    @Override
-    public CompletableFuture<Void> close() {
-        synchronized (this) {
-            if (dispatcher != null && dispatcher.isConsumerConnected()) {
-                return FutureUtil.failedFuture(new 
SubscriptionBusyException("Subscription has active consumers"));
-            }
-            return this.pendingAckHandle.closeAsync().thenAccept(v -> {
-                IS_FENCED_UPDATER.set(this, TRUE);
-                log.info("[{}][{}] Successfully closed subscription [{}]", 
topicName, subName, cursor);
-            });
+    private synchronized CompletableFuture<Void> close(boolean 
checkActiveConsumers) {
+        if (checkActiveConsumers && dispatcher != null && 
dispatcher.isConsumerConnected()) {
+            return FutureUtil.failedFuture(new 
SubscriptionBusyException("Subscription has active consumers"));
         }
+        return this.pendingAckHandle.closeAsync().thenAccept(v -> {
+            IS_FENCED_UPDATER.set(this, TRUE);
+            log.info("[{}][{}] Successfully closed subscription [{}]", 
topicName, subName, cursor);
+        });
+    }
+
+
+    /**
+     * Disconnect all consumers from this subscription.
+     *
+     * @return CompletableFuture indicating the completion of the operation.
+     */
+    @Override
+    public synchronized CompletableFuture<Void> 
disconnect(Optional<BrokerLookupData> assignedBrokerLookupData) {
+        CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
+
+        (dispatcher != null
+                ? dispatcher.disconnectAllConsumers(false, 
assignedBrokerLookupData)
+                : CompletableFuture.completedFuture(null))
+                .thenRun(() -> {
+                    log.info("[{}][{}] Successfully disconnected subscription 
consumers", topicName, subName);
+                    disconnectFuture.complete(null);
+                }).exceptionally(exception -> {
+                    log.error("[{}][{}] Error disconnecting subscription 
consumers", topicName, subName, exception);
+                    disconnectFuture.completeExceptionally(exception);
+                    return null;
+                });
+
+        return disconnectFuture;
     }
 
     /**
-     * Disconnect all consumers attached to the dispatcher and close this 
subscription.
+     * Fence this subscription and optionally disconnect all consumers.
      *
-     * @return CompletableFuture indicating the completion of disconnect 
operation
+     * @return CompletableFuture indicating the completion of the operation.
      */
     @Override
-    public synchronized CompletableFuture<Void> disconnect() {
-        if (fenceFuture != null){
+    public synchronized CompletableFuture<Void> close(boolean 
disconnectConsumers,
+                                                      
Optional<BrokerLookupData> assignedBrokerLookupData) {
+        if (fenceFuture != null) {
             return fenceFuture;
         }
+
         fenceFuture = new CompletableFuture<>();
 
         // block any further consumers on this subscription
         IS_FENCED_UPDATER.set(this, TRUE);
 
-        (dispatcher != null ? dispatcher.close() : 
CompletableFuture.completedFuture(null))
-                .thenCompose(v -> close()).thenRun(() -> {
-                    log.info("[{}][{}] Successfully disconnected and closed 
subscription", topicName, subName);
+        (dispatcher != null
+                ? dispatcher.close(disconnectConsumers, 
assignedBrokerLookupData)
+                : CompletableFuture.completedFuture(null))
+                .thenCompose(v -> close(false)).thenRun(() -> {

Review Comment:
   Previously, it was` v->close(true)`. Shouldn't it be 
`v->close(disconnectConsumers)`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java:
##########
@@ -272,11 +275,12 @@ public boolean isClosed() {
      *

Review Comment:
   nit: plz update the java doc with the new parameters.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -542,6 +549,10 @@ public SubType getType() {
 
     @Override
     public final synchronized void readEntriesComplete(List<Entry> entries, 
Object ctx) {
+        if (topic.isTransferring()) {

Review Comment:
   Don't we need to release entries?
   entry.release();
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -154,7 +154,11 @@ public void readEntriesComplete(final List<Entry> entries, 
Object obj) {
         executor.execute(() -> internalReadEntriesComplete(entries, obj));
     }
 
-    public synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
+    private synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
+        if (topic.isTransferring()) {

Review Comment:
   Don't we need to release entries?
   entry.release();
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java:
##########
@@ -289,28 +309,30 @@ public CompletableFuture<Void> close() {
      * @return CompletableFuture indicating the completion of disconnect 
operation
      */
     @Override
-    public synchronized CompletableFuture<Void> disconnect() {
-        CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
+    public synchronized CompletableFuture<Void> close(boolean 
disconnectConsumers,

Review Comment:
   nit: can you update the comments above?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -308,7 +309,7 @@ public synchronized void removeConsumer(Consumer consumer, 
boolean isResetCursor
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the 
subscription as well
-                this.close().thenRun(() -> {
+                this.close(false).thenRun(() -> {

Review Comment:
   Previously, it closed with checking active consumers. Shouldn't it be 
`close(true)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to