This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 32a9d52409a330e10002c6c4d45d1f88fcac0ee6
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Jan 25 04:59:30 2022 -0800

    [pulsar-broker] clean up active consumer on already closed connection 
(#13196)
    
    ### Motivation
    
    We have been frequently seeing an issue with an exclusive subscription 
where the broker doesn't clean up consumers on closed on connection and because 
of that exclusive consumer keeps failing with `ConsumerBusyException` even 
though none of the consumers is actually connected to the broker.
    Below log example shows that connection is closed but still broker couldn't 
clean up consumer in race condition when consumer quickly disconnects after 
creating the subscription and subsequent consumer creation requests are keep 
failing.
    
    ```
    23:30:16.896 [pulsar-io-23-42] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Subscribing 
on topic persistent://my-prop/my-cluster/ns/topic / sub1
    :
    23:30:17.223 [pulsar-io-23-42] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Closed 
consumer, consumerId=20
    :
    23:30:17.291 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://my-prop/my-cluster/ns/topic][sub1] Created new subscription for 25
    :
    23:30:17.301 [pulsar-io-23-42] INFO  
org.apache.pulsar.broker.service.ServerCnx - Closed connection from 
/10.11.12.13:60851
    :
    23:30:17.302 [pulsar-io-23-42] INFO  
org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - 
Removing consumer 
Consumer{subscription=PersistentSubscription{topic=persistent://my-prop/my-cluster/ns/topic,
 name=sub1}, consumerId=21, consumerName=c2302, address=/10.11.12.13:60851}
    :
    23:30:17.302 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Created 
subscription on topic persistent://my-prop/my-cluster/ns/topic / sub1
    :
    :
    :
    
    23:30:17.496 [pulsar-io-23-36] WARN  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already 
connected
    23:30:17.885 [pulsar-io-23-36] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing 
on topic persistent://my-prop/my-cluster/ns/topic / sub1
    23:30:17.886 [pulsar-io-23-36] WARN  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already 
connected
    23:30:18.637 [pulsar-io-23-36] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing 
on topic persistent://my-prop/my-cluster/ns/topic / sub1
    ```
    
    ### Modification
    Broker should clean up consumer if connection is already closed and allow 
consumer to reconnect as an active consumer.
    
    (cherry picked from commit 496afa7bd583c4b88a9fdc89a101be462e5c856f)
---
 .../pulsar/broker/service/AbstractTopic.java       |  8 ++++
 .../broker/service/persistent/PersistentTopic.java | 10 ++++
 .../client/impl/BrokerClientIntegrationTest.java   | 54 ++++++++++++++++++++++
 3 files changed, 72 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ba00a20..183fb05 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -250,6 +250,14 @@ public abstract class AbstractTopic implements Topic {
         return subscription.addConsumer(consumer);
     }
 
+    protected Consumer getActiveConsumer(Subscription subscription) {
+        Dispatcher dispatcher = subscription.getDispatcher();
+        if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer) {
+            return ((AbstractDispatcherSingleActiveConsumer) 
dispatcher).getActiveConsumer();
+        }
+        return null;
+    }
+
     @Override
     public void disableCnxAutoRead() {
         producers.values().forEach(producer -> 
producer.getCnx().disableCnxAutoRead());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3c08bd9..00934ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -794,6 +794,16 @@ public class PersistentTopic extends AbstractTopic
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", 
topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? 
getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();
+                        }
+                    } catch (Exception be) {
+                        log.error("Failed to clean up consumer on closed 
connection {}, {}", consumer, be.getMessage());
+                    }
                 } else if (ex.getCause() instanceof SubscriptionBusyException) 
{
                     log.warn("[{}][{}] {}", topic, subscriptionName, 
ex.getMessage());
                 } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 8e385a6..2be6067 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -69,6 +69,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.resources.BaseResources;
+import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1005,4 +1007,56 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testActiveConsumerCleanup() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numMessages = 100;
+        final CountDownLatch latch = new CountDownLatch(numMessages);
+        String topic = "persistent://my-property/my-ns/closed-cnx-topic";
+        String sub = "my-subscriber-name";
+
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
 msg) -> {
+            Assert.assertNotNull(msg, "Message cannot be null");
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message [{}] in the listener", 
receivedMessage);
+            c1.acknowledgeAsync(msg);
+            latch.countDown();
+        }).subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+
+        AbstractDispatcherSingleActiveConsumer dispatcher = 
(AbstractDispatcherSingleActiveConsumer) topicRef
+                .getSubscription(sub).getDispatcher();
+        ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
+        Field field = ServerCnx.class.getDeclaredField("isActive");
+        field.setAccessible(true);
+        field.set(cnx, false);
+
+        assertNotNull(dispatcher.getActiveConsumer());
+
+        pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        Consumer<byte[]> consumer = null;
+        for (int i = 0; i < 2; i++) {
+            try {
+                consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
 msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", 
receivedMessage);
+                    c1.acknowledgeAsync(msg);
+                    latch.countDown();
+                }).subscribe();
+                if (i == 0) {
+                    fail("Should failed with ConsumerBusyException!");
+                }
+            } catch (PulsarClientException.ConsumerBusyException ignore) {
+               // It's ok.
+            }
+        }
+        assertNotNull(consumer);
+        log.info("-- Exiting {} test --", methodName);
+    }
+
 }

Reply via email to