This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 32a9d52409a330e10002c6c4d45d1f88fcac0ee6 Author: Rajan Dhabalia <[email protected]> AuthorDate: Tue Jan 25 04:59:30 2022 -0800 [pulsar-broker] clean up active consumer on already closed connection (#13196) ### Motivation We have been frequently seeing an issue with an exclusive subscription where the broker doesn't clean up consumers on closed on connection and because of that exclusive consumer keeps failing with `ConsumerBusyException` even though none of the consumers is actually connected to the broker. Below log example shows that connection is closed but still broker couldn't clean up consumer in race condition when consumer quickly disconnects after creating the subscription and subsequent consumer creation requests are keep failing. ``` 23:30:16.896 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1 : 23:30:17.223 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Closed consumer, consumerId=20 : 23:30:17.291 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Created new subscription for 25 : 23:30:17.301 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /10.11.12.13:60851 : 23:30:17.302 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://my-prop/my-cluster/ns/topic, name=sub1}, consumerId=21, consumerName=c2302, address=/10.11.12.13:60851} : 23:30:17.302 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Created subscription on topic persistent://my-prop/my-cluster/ns/topic / sub1 : : : 23:30:17.496 [pulsar-io-23-36] WARN org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already connected 23:30:17.885 [pulsar-io-23-36] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1 23:30:17.886 [pulsar-io-23-36] WARN org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already connected 23:30:18.637 [pulsar-io-23-36] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1 ``` ### Modification Broker should clean up consumer if connection is already closed and allow consumer to reconnect as an active consumer. (cherry picked from commit 496afa7bd583c4b88a9fdc89a101be462e5c856f) --- .../pulsar/broker/service/AbstractTopic.java | 8 ++++ .../broker/service/persistent/PersistentTopic.java | 10 ++++ .../client/impl/BrokerClientIntegrationTest.java | 54 ++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ba00a20..183fb05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -250,6 +250,14 @@ public abstract class AbstractTopic implements Topic { return subscription.addConsumer(consumer); } + protected Consumer getActiveConsumer(Subscription subscription) { + Dispatcher dispatcher = subscription.getDispatcher(); + if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer) { + return ((AbstractDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer(); + } + return null; + } + @Override public void disableCnxAutoRead() { producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3c08bd9..00934ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -794,6 +794,16 @@ public class PersistentTopic extends AbstractTopic if (ex.getCause() instanceof ConsumerBusyException) { log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, consumerName); + Consumer consumer = null; + try { + consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null; + // cleanup consumer if connection is already closed + if (consumer != null && !consumer.cnx().isActive()) { + consumer.close(); + } + } catch (Exception be) { + log.error("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage()); + } } else if (ex.getCause() instanceof SubscriptionBusyException) { log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage()); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 8e385a6..2be6067 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -69,6 +69,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.broker.resources.BaseResources; +import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; +import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -1005,4 +1007,56 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { reader.close(); producer.close(); } + + @Test + public void testActiveConsumerCleanup() throws Exception { + log.info("-- Starting {} test --", methodName); + + int numMessages = 100; + final CountDownLatch latch = new CountDownLatch(numMessages); + String topic = "persistent://my-property/my-ns/closed-cnx-topic"; + String sub = "my-subscriber-name"; + + PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + c1.acknowledgeAsync(msg); + latch.countDown(); + }).subscribe(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + + AbstractDispatcherSingleActiveConsumer dispatcher = (AbstractDispatcherSingleActiveConsumer) topicRef + .getSubscription(sub).getDispatcher(); + ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx(); + Field field = ServerCnx.class.getDeclaredField("isActive"); + field.setAccessible(true); + field.set(cnx, false); + + assertNotNull(dispatcher.getActiveConsumer()); + + pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + Consumer<byte[]> consumer = null; + for (int i = 0; i < 2; i++) { + try { + consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + c1.acknowledgeAsync(msg); + latch.countDown(); + }).subscribe(); + if (i == 0) { + fail("Should failed with ConsumerBusyException!"); + } + } catch (PulsarClientException.ConsumerBusyException ignore) { + // It's ok. + } + } + assertNotNull(consumer); + log.info("-- Exiting {} test --", methodName); + } + }
