This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebcd31642a5  [fix][broker] Avoid throwing RestException in 
BrokerService (#20761)
ebcd31642a5 is described below

commit ebcd31642a543f1a5cc7a340c9d392da0ed890c4
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jul 11 14:05:59 2023 +0800

     [fix][broker] Avoid throwing RestException in BrokerService (#20761)
    
    ## Motivation
    The BrokerService should not throw RestException, as the method is not only 
called by the admin tool. When a topic is auto-created during the creation of a 
consumer/producer and exceeds the maximum number of topics allowed in the 
namespace, returning a RestException (UnknownError received by the client) 
would be considered a retryable error.
    ## Modification
    To address this issue, the BrokerService will now return a 
NotAllowException instead of a RestException when the topic count exceeds the 
maximum limit allowed in the namespace.
---
 .../pulsar/broker/service/BrokerService.java       |  4 +--
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 30 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fb9329c5bae..668195c4b80 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -68,7 +68,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
-import javax.ws.rs.core.Response;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -163,7 +162,6 @@ import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.RateLimiter;
-import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.ChannelFutures;
@@ -3427,7 +3425,7 @@ public class BrokerService implements Closeable {
                                         log.error("Failed to create persistent 
topic {}, "
                                                 + "exceed maximum number of 
topics in namespace", topicName);
                                         return FutureUtil.failedFuture(
-                                                new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                                new NotAllowedException(
                                                         "Exceed maximum number 
of topics in namespace."));
                                     } else {
                                         return 
CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3e5281b8f92..d6176966d85 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -156,6 +156,36 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         setupClusters();
     }
 
+    @Test
+    public void testExceptionOfMaxTopicsPerNamespaceCanBeHanle() throws 
Exception {
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(3);
+        super.internalSetup();
+        String topic = "persistent://testTenant/ns1/test_create_topic_v";
+        TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                Sets.newHashSet("test"));
+        // check producer/consumer auto create non-partitioned topic
+        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace("testTenant/ns1", 
Sets.newHashSet("test"));
+
+        pulsarClient.newProducer().topic(topic + "1").create().close();
+        pulsarClient.newProducer().topic(topic + "2").create().close();
+        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe().close();
+        try {
+            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub")
+                    .subscribeAsync().get(5, TimeUnit.SECONDS);
+            Assert.fail();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof 
PulsarClientException.NotAllowedException);
+        }
+
+        // reset configuration
+        conf.setMaxTopicsPerNamespace(0);
+        conf.setDefaultNumPartitions(1);
+    }
+
     @Override
     protected ServiceConfiguration getDefaultConf() {
         ServiceConfiguration conf = super.getDefaultConf();

Reply via email to