This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03cfbda0a33 [improve][test] add a test to ensure the exclusive 
consumer will reconnect even if received ConsumerBusy error (#20621)
03cfbda0a33 is described below

commit 03cfbda0a33010159ffe904d61730bc11eaff34f
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 18 11:20:57 2023 +0800

    [improve][test] add a test to ensure the exclusive consumer will reconnect 
even if received ConsumerBusy error (#20621)
    
    add a test to ensure the exclusive consumer will reconnect even if received 
ConsumerBusy error
---
 .../client/impl/ProducerConsumerInternalTest.java  | 90 ++++++++++++++++++++++
 1 file changed, 90 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
new file mode 100644
index 00000000000..5f242d44954
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.CountDownLatch;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Different with {@link 
org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit 
the variables
+ * of {@link ConsumerImpl} which are modified `protected`.
+ */
+@Test(groups = "broker-api")
+public class ProducerConsumerInternalTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void 
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "subscription1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        final ConsumerImpl consumer = (ConsumerImpl) 
pulsarClient.newConsumer().topic(topicName.toString())
+                
.subscriptionType(SubscriptionType.Exclusive).subscriptionName(subscriptionName).subscribe();
+
+        ClientCnx clientCnx = consumer.getClientCnx();
+        ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService()
+                
.getTopic(topicName,false).join().get().getSubscription(subscriptionName)
+                .getDispatcher().getConsumers().get(0).cnx();
+
+        // Make a disconnect to trigger broker remove the consumer which 
related this connection.
+        // Make the second subscribe runs after the broker removing the old 
consumer, then it will receive
+        // an error: "Exclusive consumer is already connected"
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        serverCnx.execute(() -> {
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        clientCnx.close();
+        Thread.sleep(1000);
+        countDownLatch.countDown();
+
+        // Verify the consumer will always retry subscribe event received 
ConsumerBusy error.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.getState(), HandlerState.State.Ready);
+        });
+
+        // cleanup.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+}

Reply via email to