This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a52945b1c51 [fix] [broker] fix mismatch between 
dispatcher.consumerList and dispatcher.consumerSet (#22283)
a52945b1c51 is described below

commit a52945b1c51fa874667eecb9fea9bf03e5d6153b
Author: fengyubiao <[email protected]>
AuthorDate: Tue Mar 26 07:41:07 2024 +0800

    [fix] [broker] fix mismatch between dispatcher.consumerList and 
dispatcher.consumerSet (#22283)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  16 ++-
 .../PersistentDispatcherMultipleConsumers.java     |   8 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   5 +-
 ...impleProducerConsumerMLInitializeDelayTest.java | 108 +++++++++++++++++++++
 4 files changed, 131 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3ab25eb098c..4f82f416ed2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1224,10 +1224,20 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
                                 "Consumer is already present on the 
connection");
                     } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+                        log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
                         ServerError error = 
getErrorCodeWithErrorLog(existingConsumerFuture, true,
-                                String.format("Consumer subscribe failure. 
remoteAddress: %s, subscription: %s",
-                                        remoteAddress, subscriptionName));
-                        consumers.remove(consumerId, existingConsumerFuture);
+                                String.format("A failed consumer with id is 
already present on the connection."
+                                                + " consumerId: %s, 
remoteAddress: %s, subscription: %s",
+                                        consumerId, remoteAddress, 
subscriptionName));
+                        /**
+                         * This future may was failed due to the client closed 
a in-progress subscribing.
+                         * See {@link 
#handleCloseConsumer(CommandCloseConsumer)}
+                         * Do not remove the failed future at current line, it 
will be removed after the progress of
+                         * the previous subscribing is done.
+                         * Before the previous subscribing is done, the new 
subscribe request will always fail.
+                         * This mechanism is in order to prevent more complex 
logic to handle the race conditions.
+                         */
                         commandSender.sendErrorResponse(requestId, error,
                                 "Consumer that failed is already present on 
the connection");
                     } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 35204e7af72..039104fe022 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -190,9 +190,15 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
 
         if (isConsumersExceededOnSubscription()) {
-            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit {}",
+                    name, consumer);
             return FutureUtil.failedFuture(new 
ConsumerBusyException("Subscription reached max consumers limit"));
         }
+        // This is not an expected scenario, it will never happen in expected. 
Just print a warn log if the unexpected
+        // scenario happens. See more detail: 
https://github.com/apache/pulsar/pull/22283.
+        if (consumerSet.contains(consumer)) {
+            log.warn("[{}] Attempting to add a consumer that already 
registered {}", name, consumer);
+        }
 
         consumerList.add(consumer);
         if (consumerList.size() > 1
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e195f220f87..1cb2f76c5e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -3382,8 +3382,9 @@ public class ServerCnxTest {
             };
             // assert error response
             assertTrue(responseAssert.test(responseAssert));
-            // assert consumer-delete event occur
-            assertEquals(1L,
+            // The delete event will only occur after the future is completed.
+            // assert consumer-delete event will not occur.
+            assertEquals(0L,
                     deleteTimesMark.getAllValues().stream().filter(f -> f == 
existingConsumerFuture).count());
             // Server will not close the connection
             assertTrue(channel.isOpen());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
new file mode 100644
index 00000000000..ab4e063ae3d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.carrotsearch.hppc.ObjectSet;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class SimpleProducerConsumerMLInitializeDelayTest extends 
ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setTopicLoadTimeoutSeconds(60 * 5);
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testConsumerListMatchesConsumerSet() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subName = "sub";
+        final int clientOperationTimeout = 3;
+        final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000;
+        final int clientMaxBackoffSeconds = clientOperationTimeout * 2;
+        admin.topics().createNonPartitionedTopic(topicName);
+        // Create a client with a low operation timeout.
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .operationTimeout(clientOperationTimeout, TimeUnit.SECONDS)
+                .maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS)
+                .build();
+        Consumer consumer = client.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        // Inject a delay for the initialization of ML, to make the consumer 
to register twice.
+        // Consumer register twice: the first will be timeout, and try again.
+        AtomicInteger delayTimes = new AtomicInteger();
+        mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> {
+            if (op.toString().equals("GET") && 
s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
+                return delayTimes.incrementAndGet() == 1;
+            }
+            return false;
+        });
+        admin.topics().unload(topicName);
+        // Verify: at last, "dispatcher.consumers.size" equals 
"dispatcher.consumerList.size".
+        Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3))
+                .ignoreExceptions().untilAsserted(() -> {
+            Dispatcher dispatcher = pulsar.getBrokerService()
+                            .getTopic(topicName, false).join().get()
+                            .getSubscription(subName).getDispatcher();
+            ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, 
"consumerSet");
+            List consumerList = WhiteboxImpl.getInternalState(dispatcher, 
"consumerList");
+            log.info("consumerSet_size: {}, consumerList_size: {}", 
consumerSet.size(), consumerList.size());
+            Assert.assertEquals(consumerList.size(), 1);
+            Assert.assertEquals(consumerSet.size(), 1);
+        });
+
+        // Verify: the topic can be deleted.
+        consumer.close();
+        admin.topics().delete(topicName);
+        // cleanup.
+        client.close();
+    }
+}

Reply via email to