This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cba1600d0f6 [fix] [broker] Close dispatchers stuck due to mismatch
between dispatcher.consumerList and dispatcher.consumerSet (#22270)
cba1600d0f6 is described below
commit cba1600d0f6a82f1ea194f3214a80f283fe8dc27
Author: fengyubiao <[email protected]>
AuthorDate: Sat Mar 23 14:52:56 2024 +0800
[fix] [broker] Close dispatchers stuck due to mismatch between
dispatcher.consumerList and dispatcher.consumerSet (#22270)
---
.../PersistentDispatcherMultipleConsumers.java | 36 ++++++--
.../PersistentDispatcherMultipleConsumersTest.java | 101 +++++++++++++++++++++
2 files changed, 127 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index be82b190ffb..35204e7af72 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -217,15 +217,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
consumerList.remove(consumer);
log.info("Removed consumer {} with pending {} acks", consumer,
consumer.getPendingAcks().size());
if (consumerList.isEmpty()) {
- cancelPendingRead();
-
- redeliveryMessages.clear();
- redeliveryTracker.clear();
- if (closeFuture != null) {
- log.info("[{}] All consumers removed. Subscription is
disconnected", name);
- closeFuture.complete(null);
- }
- totalAvailablePermits = 0;
+ clearComponentsAfterRemovedAllConsumers();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer are left, reading more entries",
name);
@@ -242,8 +234,29 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
readMoreEntries();
}
} else {
- log.info("[{}] Trying to remove a non-connected consumer: {}",
name, consumer);
+ /**
+ * This is not an expected scenario, it will never happen in
expected.
+ * Just add a defensive code to avoid the topic can not be
unloaded anymore: remove the consumers which
+ * are not mismatch with {@link #consumerSet}. See more detail:
https://github.com/apache/pulsar/pull/22270.
+ */
+ log.error("[{}] Trying to remove a non-connected consumer: {}",
name, consumer);
+ consumerList.removeIf(c -> consumer.equals(c));
+ if (consumerList.isEmpty()) {
+ clearComponentsAfterRemovedAllConsumers();
+ }
+ }
+ }
+
+ private synchronized void clearComponentsAfterRemovedAllConsumers() {
+ cancelPendingRead();
+
+ redeliveryMessages.clear();
+ redeliveryTracker.clear();
+ if (closeFuture != null) {
+ log.info("[{}] All consumers removed. Subscription is
disconnected", name);
+ closeFuture.complete(null);
}
+ totalAvailablePermits = 0;
}
@Override
@@ -554,6 +567,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
if (consumerList.isEmpty()) {
closeFuture.complete(null);
} else {
+ // Iterator of CopyOnWriteArrayList uses the internal array to do
the for-each, and CopyOnWriteArrayList
+ // will create a new internal array when adding/removing a new
item. So remove items in the for-each
+ // block is safety when the for-each and add/remove are using a
same lock.
consumerList.forEach(consumer ->
consumer.disconnect(isResetCursor, assignedBrokerLookupData));
cancelPendingRead();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
new file mode 100644
index 00000000000..f24c5c5933e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.carrotsearch.hppc.ObjectSet;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class PersistentDispatcherMultipleConsumersTest extends
ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testTopicDeleteIfConsumerSetMismatchConsumerList() throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscription = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+ // Make an error that "consumerSet" is mismatch with "consumerList".
+ Dispatcher dispatcher = pulsar.getBrokerService()
+ .getTopic(topicName, false).join().get()
+ .getSubscription(subscription).getDispatcher();
+ ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet =
+ WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
+ List<org.apache.pulsar.broker.service.Consumer> consumerList =
+ WhiteboxImpl.getInternalState(dispatcher, "consumerList");
+
+ org.apache.pulsar.broker.service.Consumer serviceConsumer =
consumerList.get(0);
+ consumerSet.add(serviceConsumer);
+ consumerList.add(serviceConsumer);
+
+ // Verify: the topic can be deleted successfully.
+ consumer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testTopicDeleteIfConsumerSetMismatchConsumerList2() throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscription = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+ // Make an error that "consumerSet" is mismatch with "consumerList".
+ Dispatcher dispatcher = pulsar.getBrokerService()
+ .getTopic(topicName, false).join().get()
+ .getSubscription(subscription).getDispatcher();
+ ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet =
+ WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
+ consumerSet.clear();
+
+ // Verify: the topic can be deleted successfully.
+ consumer.close();
+ admin.topics().delete(topicName, false);
+ }
+}