This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new abe6d79510c [fix][broker] Fix ProducerBusy issue due to incorrect
userCreatedProducerCount on non-persistent topic (#22685)
abe6d79510c is described below
commit abe6d79510c9c743b60a4cfcf319ee27557763ed
Author: hrzzzz <[email protected]>
AuthorDate: Thu May 9 21:49:27 2024 +0800
[fix][broker] Fix ProducerBusy issue due to incorrect
userCreatedProducerCount on non-persistent topic (#22685)
Co-authored-by: ruihongzhou <[email protected]>
(cherry picked from commit 253e6506ea2c5ccc6afe1117e311cf24685ce4e9)
---
.../service/nonpersistent/NonPersistentTopic.java | 10 ----------
.../nonpersistent/NonPersistentTopicTest.java | 22 ++++++++++++++++++++++
2 files changed, 22 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index d19aeaa4b0f..86eab3d38b0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
-import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
import static
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
@@ -58,7 +57,6 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersio
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.GetStatsOptions;
-import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
@@ -249,14 +247,6 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
return false;
}
- @Override
- public void removeProducer(Producer producer) {
- checkArgument(producer.getTopic() == this);
- if (producers.remove(producer.getProducerName(), producer)) {
- handleProducerRemoved(producer);
- }
- }
-
@Override
public CompletableFuture<Void>
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index b33381126e5..e2aec70fb11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import java.lang.reflect.Field;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -250,4 +252,24 @@ public class NonPersistentTopicTest extends BrokerTestBase
{
Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
.until(() -> subscriptionMap.get(keySharedSubName) == null);
}
+
+
+ @Test
+ public void testRemoveProducerOnNonPersistentTopic() throws Exception {
+ final String topicName = "non-persistent://prop/ns-abc/topic_" +
UUID.randomUUID();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ NonPersistentTopic topic = (NonPersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ Field field =
AbstractTopic.class.getDeclaredField("userCreatedProducerCount");
+ field.setAccessible(true);
+ int userCreatedProducerCount = (int) field.get(topic);
+ assertEquals(userCreatedProducerCount, 1);
+
+ producer.close();
+ userCreatedProducerCount = (int) field.get(topic);
+ assertEquals(userCreatedProducerCount, 0);
+ }
}