This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03cfbda0a33 [improve][test] add a test to ensure the exclusive
consumer will reconnect even if received ConsumerBusy error (#20621)
03cfbda0a33 is described below
commit 03cfbda0a33010159ffe904d61730bc11eaff34f
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 18 11:20:57 2023 +0800
[improve][test] add a test to ensure the exclusive consumer will reconnect
even if received ConsumerBusy error (#20621)
add a test to ensure the exclusive consumer will reconnect even if received
ConsumerBusy error
---
.../client/impl/ProducerConsumerInternalTest.java | 90 ++++++++++++++++++++++
1 file changed, 90 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
new file mode 100644
index 00000000000..5f242d44954
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Different with {@link
org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit
the variables
+ * of {@link ConsumerImpl} which are modified `protected`.
+ */
+@Test(groups = "broker-api")
+public class ProducerConsumerInternalTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "subscription1";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ final ConsumerImpl consumer = (ConsumerImpl)
pulsarClient.newConsumer().topic(topicName.toString())
+
.subscriptionType(SubscriptionType.Exclusive).subscriptionName(subscriptionName).subscribe();
+
+ ClientCnx clientCnx = consumer.getClientCnx();
+ ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService()
+
.getTopic(topicName,false).join().get().getSubscription(subscriptionName)
+ .getDispatcher().getConsumers().get(0).cnx();
+
+ // Make a disconnect to trigger broker remove the consumer which
related this connection.
+ // Make the second subscribe runs after the broker removing the old
consumer, then it will receive
+ // an error: "Exclusive consumer is already connected"
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ serverCnx.execute(() -> {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ clientCnx.close();
+ Thread.sleep(1000);
+ countDownLatch.countDown();
+
+ // Verify the consumer will always retry subscribe event received
ConsumerBusy error.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(consumer.getState(), HandlerState.State.Ready);
+ });
+
+ // cleanup.
+ consumer.close();
+ admin.topics().delete(topicName, false);
+ }
+}