eolivelli commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r780710799
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
##########
@@ -997,4 +999,53 @@ public void
testConsumerWithPooledMessagesWithReader(boolean isBatchingEnabled)
reader.close();
producer.close();
}
+
+ @Test
+ public void testActiveConsumerCleanup() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ int numMessages = 100;
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ String topic = "persistent://my-property/my-ns/closed-cnx-topic";
+ String sub = "my-subscriber-name";
+
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener",
receivedMessage);
+ c1.acknowledgeAsync(msg);
+ latch.countDown();
+ }).subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+
+ AbstractDispatcherSingleActiveConsumer dispatcher =
(AbstractDispatcherSingleActiveConsumer) topicRef
+ .getSubscription(sub).getDispatcher();
+ ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
+ Field field = ServerCnx.class.getDeclaredField("isActive");
+ field.setAccessible(true);
+ field.set(cnx, false);
+
+ assertNotNull(dispatcher.getActiveConsumer());
+
+ pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ Consumer<byte[]> consumer = null;
+ for (int i = 0; i < 2; i++) {
+ try {
+ consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener",
receivedMessage);
+ c1.acknowledgeAsync(msg);
+ latch.countDown();
+ }).subscribe();
+ } catch (Exception e) {
+ // Ok
Review comment:
Can we perform some checks on this Exception? At least match the Java
class.
Otherwise here we can pass the tests even for other kinds of bugs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]