This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9ddf78d4224003c5a6f8bd3d18458d9bdd322924 Author: hrzzzz <[email protected]> AuthorDate: Thu May 9 21:49:27 2024 +0800 [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685) Co-authored-by: ruihongzhou <[email protected]> (cherry picked from commit 253e6506ea2c5ccc6afe1117e311cf24685ce4e9) --- .../service/nonpersistent/NonPersistentTopic.java | 10 ---------- .../nonpersistent/NonPersistentTopicTest.java | 22 ++++++++++++++++++++++ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index ecc6134ecda..8af654633b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service.nonpersistent; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create; import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; @@ -57,7 +56,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersio import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.GetStatsOptions; -import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; @@ -248,14 +246,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol return false; } - @Override - public void removeProducer(Producer producer) { - checkArgument(producer.getTopic() == this); - if (producers.remove(producer.getProducerName(), producer)) { - handleProducerRemoved(producer); - } - } - @Override public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index b33381126e5..e2aec70fb11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import java.lang.reflect.Field; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.SubscriptionOption; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -250,4 +252,24 @@ public class NonPersistentTopicTest extends BrokerTestBase { Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) .until(() -> subscriptionMap.get(keySharedSubName) == null); } + + + @Test + public void testRemoveProducerOnNonPersistentTopic() throws Exception { + final String topicName = "non-persistent://prop/ns-abc/topic_" + UUID.randomUUID(); + + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + Field field = AbstractTopic.class.getDeclaredField("userCreatedProducerCount"); + field.setAccessible(true); + int userCreatedProducerCount = (int) field.get(topic); + assertEquals(userCreatedProducerCount, 1); + + producer.close(); + userCreatedProducerCount = (int) field.get(topic); + assertEquals(userCreatedProducerCount, 0); + } }
