This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a52945b1c51 [fix] [broker] fix mismatch between
dispatcher.consumerList and dispatcher.consumerSet (#22283)
a52945b1c51 is described below
commit a52945b1c51fa874667eecb9fea9bf03e5d6153b
Author: fengyubiao <[email protected]>
AuthorDate: Tue Mar 26 07:41:07 2024 +0800
[fix] [broker] fix mismatch between dispatcher.consumerList and
dispatcher.consumerSet (#22283)
---
.../apache/pulsar/broker/service/ServerCnx.java | 16 ++-
.../PersistentDispatcherMultipleConsumers.java | 8 +-
.../pulsar/broker/service/ServerCnxTest.java | 5 +-
...impleProducerConsumerMLInitializeDelayTest.java | 108 +++++++++++++++++++++
4 files changed, 131 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3ab25eb098c..4f82f416ed2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1224,10 +1224,20 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendErrorResponse(requestId,
ServerError.ServiceNotReady,
"Consumer is already present on the
connection");
} else if
(existingConsumerFuture.isCompletedExceptionally()){
+ log.warn("[{}][{}][{}] A failed consumer with id is
already present on the connection,"
+ + " consumerId={}", remoteAddress, topicName,
subscriptionName, consumerId);
ServerError error =
getErrorCodeWithErrorLog(existingConsumerFuture, true,
- String.format("Consumer subscribe failure.
remoteAddress: %s, subscription: %s",
- remoteAddress, subscriptionName));
- consumers.remove(consumerId, existingConsumerFuture);
+ String.format("A failed consumer with id is
already present on the connection."
+ + " consumerId: %s,
remoteAddress: %s, subscription: %s",
+ consumerId, remoteAddress,
subscriptionName));
+ /**
+ * This future may was failed due to the client closed
a in-progress subscribing.
+ * See {@link
#handleCloseConsumer(CommandCloseConsumer)}
+ * Do not remove the failed future at current line, it
will be removed after the progress of
+ * the previous subscribing is done.
+ * Before the previous subscribing is done, the new
subscribe request will always fail.
+ * This mechanism is in order to prevent more complex
logic to handle the race conditions.
+ */
commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on
the connection");
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 35204e7af72..039104fe022 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -190,9 +190,15 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
if (isConsumersExceededOnSubscription()) {
- log.warn("[{}] Attempting to add consumer to subscription which
reached max consumers limit", name);
+ log.warn("[{}] Attempting to add consumer to subscription which
reached max consumers limit {}",
+ name, consumer);
return FutureUtil.failedFuture(new
ConsumerBusyException("Subscription reached max consumers limit"));
}
+ // This is not an expected scenario, it will never happen in expected.
Just print a warn log if the unexpected
+ // scenario happens. See more detail:
https://github.com/apache/pulsar/pull/22283.
+ if (consumerSet.contains(consumer)) {
+ log.warn("[{}] Attempting to add a consumer that already
registered {}", name, consumer);
+ }
consumerList.add(consumer);
if (consumerList.size() > 1
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e195f220f87..1cb2f76c5e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -3382,8 +3382,9 @@ public class ServerCnxTest {
};
// assert error response
assertTrue(responseAssert.test(responseAssert));
- // assert consumer-delete event occur
- assertEquals(1L,
+ // The delete event will only occur after the future is completed.
+ // assert consumer-delete event will not occur.
+ assertEquals(0L,
deleteTimesMark.getAllValues().stream().filter(f -> f ==
existingConsumerFuture).count());
// Server will not close the connection
assertTrue(channel.isOpen());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
new file mode 100644
index 00000000000..ab4e063ae3d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.carrotsearch.hppc.ObjectSet;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class SimpleProducerConsumerMLInitializeDelayTest extends
ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setTopicLoadTimeoutSeconds(60 * 5);
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testConsumerListMatchesConsumerSet() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subName = "sub";
+ final int clientOperationTimeout = 3;
+ final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000;
+ final int clientMaxBackoffSeconds = clientOperationTimeout * 2;
+ admin.topics().createNonPartitionedTopic(topicName);
+ // Create a client with a low operation timeout.
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .operationTimeout(clientOperationTimeout, TimeUnit.SECONDS)
+ .maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS)
+ .build();
+ Consumer consumer = client.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ // Inject a delay for the initialization of ML, to make the consumer
to register twice.
+ // Consumer register twice: the first will be timeout, and try again.
+ AtomicInteger delayTimes = new AtomicInteger();
+ mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> {
+ if (op.toString().equals("GET") &&
s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
+ return delayTimes.incrementAndGet() == 1;
+ }
+ return false;
+ });
+ admin.topics().unload(topicName);
+ // Verify: at last, "dispatcher.consumers.size" equals
"dispatcher.consumerList.size".
+ Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3))
+ .ignoreExceptions().untilAsserted(() -> {
+ Dispatcher dispatcher = pulsar.getBrokerService()
+ .getTopic(topicName, false).join().get()
+ .getSubscription(subName).getDispatcher();
+ ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher,
"consumerSet");
+ List consumerList = WhiteboxImpl.getInternalState(dispatcher,
"consumerList");
+ log.info("consumerSet_size: {}, consumerList_size: {}",
consumerSet.size(), consumerList.size());
+ Assert.assertEquals(consumerList.size(), 1);
+ Assert.assertEquals(consumerSet.size(), 1);
+ });
+
+ // Verify: the topic can be deleted.
+ consumer.close();
+ admin.topics().delete(topicName);
+ // cleanup.
+ client.close();
+ }
+}