This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 981d1e37a88  [fix][broker] Avoid throwing RestException in 
BrokerService (#20761)
981d1e37a88 is described below

commit 981d1e37a888a05ac0e2a5692e9ac46b65b0b8ea
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jul 11 14:05:59 2023 +0800

     [fix][broker] Avoid throwing RestException in BrokerService (#20761)
    
    The BrokerService should not throw RestException, as the method is not only 
called by the admin tool. When a topic is auto-created during the creation of a 
consumer/producer and exceeds the maximum number of topics allowed in the 
namespace, returning a RestException (UnknownError received by the client) 
would be considered a retryable error.
    To address this issue, the BrokerService will now return a 
NotAllowException instead of a RestException when the topic count exceeds the 
maximum limit allowed in the namespace.
---
 .../pulsar/broker/service/BrokerService.java       |  4 +--
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 32 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 145fee720fd..7dcb9a6968f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -72,7 +72,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
-import javax.ws.rs.core.Response;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -163,7 +162,6 @@ import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.RateLimiter;
-import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.ChannelFutures;
@@ -3250,7 +3248,7 @@ public class BrokerService implements Closeable {
                                         log.error("Failed to create persistent 
topic {}, "
                                                 + "exceed maximum number of 
topics in namespace", topicName);
                                         return FutureUtil.failedFuture(
-                                                new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                                new NotAllowedException(
                                                         "Exceed maximum number 
of topics in namespace."));
                                     } else {
                                         return 
CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index ae8f3fdd54b..3523c56c0da 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -107,6 +107,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -141,6 +142,37 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace("prop-xyz/ns1", 
Sets.newHashSet("test"));
     }
 
+    @Test
+    public void testExceptionOfMaxTopicsPerNamespaceCanBeHanle() throws 
Exception {
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(3);
+        super.internalSetup();
+        String topic = "persistent://testTenant/ns1/test_create_topic_v";
+        TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                Sets.newHashSet("test"));
+        // check producer/consumer auto create non-partitioned topic
+        
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED.toString());
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace("testTenant/ns1", 
Sets.newHashSet("test"));
+
+        pulsarClient.newProducer().topic(topic + "1").create().close();
+        pulsarClient.newProducer().topic(topic + "2").create().close();
+        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe().close();
+        try {
+            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub")
+                    .subscribeAsync().get(5, TimeUnit.SECONDS);
+            Assert.fail();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof 
PulsarClientException.NotAllowedException);
+        }
+
+        // reset configuration
+        conf.setMaxTopicsPerNamespace(0);
+        conf.setDefaultNumPartitions(1);
+    }
+
+
     @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {

Reply via email to