This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 89b545eaa8c25a5be5d5e92981474ff752f015db
Author: hrzzzz <[email protected]>
AuthorDate: Thu May 9 21:49:27 2024 +0800

    [fix][broker] Fix ProducerBusy issue due to incorrect 
userCreatedProducerCount on non-persistent topic (#22685)
    
    Co-authored-by: ruihongzhou <[email protected]>
    (cherry picked from commit 253e6506ea2c5ccc6afe1117e311cf24685ce4e9)
---
 .../service/nonpersistent/NonPersistentTopic.java  | 10 ----------
 .../nonpersistent/NonPersistentTopicTest.java      | 22 ++++++++++++++++++++++
 2 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 08d21f6591f..6e6d944c59b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
 import static 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
@@ -55,7 +54,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedExcept
 import 
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
@@ -244,14 +242,6 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         return false;
     }
 
-    @Override
-    public void removeProducer(Producer producer) {
-        checkArgument(producer.getTopic() == this);
-        if (producers.remove(producer.getProducerName(), producer)) {
-            handleProducerRemoved(producer);
-        }
-    }
-
     @Override
     public CompletableFuture<Void> 
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
         return  CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 73a1084f30f..766cc2353d4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import java.lang.reflect.Field;
+import java.util.UUID;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -119,4 +122,23 @@ public class NonPersistentTopicTest extends BrokerTestBase 
{
         }
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }
+
+    @Test
+    public void testRemoveProducerOnNonPersistentTopic() throws Exception {
+        final String topicName = "non-persistent://prop/ns-abc/topic_" + 
UUID.randomUUID();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        NonPersistentTopic topic = (NonPersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        Field field = 
AbstractTopic.class.getDeclaredField("userCreatedProducerCount");
+        field.setAccessible(true);
+        int userCreatedProducerCount = (int) field.get(topic);
+        assertEquals(userCreatedProducerCount, 1);
+
+        producer.close();
+        userCreatedProducerCount = (int) field.get(topic);
+        assertEquals(userCreatedProducerCount, 0);
+    }
 }

Reply via email to