This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cba1600d0f6 [fix] [broker] Close dispatchers stuck due to mismatch 
between dispatcher.consumerList and dispatcher.consumerSet (#22270)
cba1600d0f6 is described below

commit cba1600d0f6a82f1ea194f3214a80f283fe8dc27
Author: fengyubiao <[email protected]>
AuthorDate: Sat Mar 23 14:52:56 2024 +0800

    [fix] [broker] Close dispatchers stuck due to mismatch between 
dispatcher.consumerList and dispatcher.consumerSet (#22270)
---
 .../PersistentDispatcherMultipleConsumers.java     |  36 ++++++--
 .../PersistentDispatcherMultipleConsumersTest.java | 101 +++++++++++++++++++++
 2 files changed, 127 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index be82b190ffb..35204e7af72 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -217,15 +217,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             consumerList.remove(consumer);
             log.info("Removed consumer {} with pending {} acks", consumer, 
consumer.getPendingAcks().size());
             if (consumerList.isEmpty()) {
-                cancelPendingRead();
-
-                redeliveryMessages.clear();
-                redeliveryTracker.clear();
-                if (closeFuture != null) {
-                    log.info("[{}] All consumers removed. Subscription is 
disconnected", name);
-                    closeFuture.complete(null);
-                }
-                totalAvailablePermits = 0;
+                clearComponentsAfterRemovedAllConsumers();
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Consumer are left, reading more entries", 
name);
@@ -242,8 +234,29 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 readMoreEntries();
             }
         } else {
-            log.info("[{}] Trying to remove a non-connected consumer: {}", 
name, consumer);
+            /**
+             * This is not an expected scenario, it will never happen in 
expected.
+             * Just add a defensive code to avoid the topic can not be 
unloaded anymore: remove the consumers which
+             * are not mismatch with {@link #consumerSet}. See more detail: 
https://github.com/apache/pulsar/pull/22270.
+             */
+            log.error("[{}] Trying to remove a non-connected consumer: {}", 
name, consumer);
+            consumerList.removeIf(c -> consumer.equals(c));
+            if (consumerList.isEmpty()) {
+                clearComponentsAfterRemovedAllConsumers();
+            }
+        }
+    }
+
+    private synchronized void clearComponentsAfterRemovedAllConsumers() {
+        cancelPendingRead();
+
+        redeliveryMessages.clear();
+        redeliveryTracker.clear();
+        if (closeFuture != null) {
+            log.info("[{}] All consumers removed. Subscription is 
disconnected", name);
+            closeFuture.complete(null);
         }
+        totalAvailablePermits = 0;
     }
 
     @Override
@@ -554,6 +567,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         if (consumerList.isEmpty()) {
             closeFuture.complete(null);
         } else {
+            // Iterator of CopyOnWriteArrayList uses the internal array to do 
the for-each, and CopyOnWriteArrayList
+            // will create a new internal array when adding/removing a new 
item. So remove items in the for-each
+            // block is safety when the for-each and add/remove are using a 
same lock.
             consumerList.forEach(consumer -> 
consumer.disconnect(isResetCursor, assignedBrokerLookupData));
             cancelPendingRead();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
new file mode 100644
index 00000000000..f24c5c5933e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.carrotsearch.hppc.ObjectSet;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class PersistentDispatcherMultipleConsumersTest extends 
ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testTopicDeleteIfConsumerSetMismatchConsumerList() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscription, 
MessageId.earliest);
+
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+        // Make an error that "consumerSet" is mismatch with "consumerList".
+        Dispatcher dispatcher = pulsar.getBrokerService()
+                .getTopic(topicName, false).join().get()
+                .getSubscription(subscription).getDispatcher();
+        ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet =
+                WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
+        List<org.apache.pulsar.broker.service.Consumer> consumerList =
+                WhiteboxImpl.getInternalState(dispatcher, "consumerList");
+
+        org.apache.pulsar.broker.service.Consumer serviceConsumer = 
consumerList.get(0);
+        consumerSet.add(serviceConsumer);
+        consumerList.add(serviceConsumer);
+
+        // Verify: the topic can be deleted successfully.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testTopicDeleteIfConsumerSetMismatchConsumerList2() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscription, 
MessageId.earliest);
+
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+        // Make an error that "consumerSet" is mismatch with "consumerList".
+        Dispatcher dispatcher = pulsar.getBrokerService()
+                .getTopic(topicName, false).join().get()
+                .getSubscription(subscription).getDispatcher();
+        ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet =
+                WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
+        consumerSet.clear();
+
+        // Verify: the topic can be deleted successfully.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+}

Reply via email to