This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 981d1e37a88 [fix][broker] Avoid throwing RestException in
BrokerService (#20761)
981d1e37a88 is described below
commit 981d1e37a888a05ac0e2a5692e9ac46b65b0b8ea
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jul 11 14:05:59 2023 +0800
[fix][broker] Avoid throwing RestException in BrokerService (#20761)
The BrokerService should not throw RestException, as the method is not only
called by the admin tool. When a topic is auto-created during the creation of a
consumer/producer and exceeds the maximum number of topics allowed in the
namespace, returning a RestException (UnknownError received by the client)
would be considered a retryable error.
To address this issue, the BrokerService will now return a
NotAllowException instead of a RestException when the topic count exceeds the
maximum limit allowed in the namespace.
---
.../pulsar/broker/service/BrokerService.java | 4 +--
.../apache/pulsar/broker/admin/AdminApi2Test.java | 32 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 145fee720fd..7dcb9a6968f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -72,7 +72,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import javax.ws.rs.core.Response;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -163,7 +162,6 @@ import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.RateLimiter;
-import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.ChannelFutures;
@@ -3250,7 +3248,7 @@ public class BrokerService implements Closeable {
log.error("Failed to create persistent
topic {}, "
+ "exceed maximum number of
topics in namespace", topicName);
return FutureUtil.failedFuture(
- new
RestException(Response.Status.PRECONDITION_FAILED,
+ new NotAllowedException(
"Exceed maximum number
of topics in namespace."));
} else {
return
CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index ae8f3fdd54b..3523c56c0da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -107,6 +107,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -141,6 +142,37 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace("prop-xyz/ns1",
Sets.newHashSet("test"));
}
+ @Test
+ public void testExceptionOfMaxTopicsPerNamespaceCanBeHanle() throws
Exception {
+ super.internalCleanup();
+ conf.setMaxTopicsPerNamespace(3);
+ super.internalSetup();
+ String topic = "persistent://testTenant/ns1/test_create_topic_v";
+ TenantInfoImpl tenantInfo = new
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+ Sets.newHashSet("test"));
+ // check producer/consumer auto create non-partitioned topic
+
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED.toString());
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ admin.tenants().createTenant("testTenant", tenantInfo);
+ admin.namespaces().createNamespace("testTenant/ns1",
Sets.newHashSet("test"));
+
+ pulsarClient.newProducer().topic(topic + "1").create().close();
+ pulsarClient.newProducer().topic(topic + "2").create().close();
+ pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe().close();
+ try {
+ pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub")
+ .subscribeAsync().get(5, TimeUnit.SECONDS);
+ Assert.fail();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof
PulsarClientException.NotAllowedException);
+ }
+
+ // reset configuration
+ conf.setMaxTopicsPerNamespace(0);
+ conf.setDefaultNumPartitions(1);
+ }
+
+
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {