This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebcd31642a5 [fix][broker] Avoid throwing RestException in
BrokerService (#20761)
ebcd31642a5 is described below
commit ebcd31642a543f1a5cc7a340c9d392da0ed890c4
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jul 11 14:05:59 2023 +0800
[fix][broker] Avoid throwing RestException in BrokerService (#20761)
## Motivation
The BrokerService should not throw RestException, as the method is not only
called by the admin tool. When a topic is auto-created during the creation of a
consumer/producer and exceeds the maximum number of topics allowed in the
namespace, returning a RestException (UnknownError received by the client)
would be considered a retryable error.
## Modification
To address this issue, the BrokerService will now return a
NotAllowException instead of a RestException when the topic count exceeds the
maximum limit allowed in the namespace.
---
.../pulsar/broker/service/BrokerService.java | 4 +--
.../apache/pulsar/broker/admin/AdminApi2Test.java | 30 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fb9329c5bae..668195c4b80 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -68,7 +68,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import javax.ws.rs.core.Response;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -163,7 +162,6 @@ import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.RateLimiter;
-import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.ChannelFutures;
@@ -3427,7 +3425,7 @@ public class BrokerService implements Closeable {
log.error("Failed to create persistent
topic {}, "
+ "exceed maximum number of
topics in namespace", topicName);
return FutureUtil.failedFuture(
- new
RestException(Response.Status.PRECONDITION_FAILED,
+ new NotAllowedException(
"Exceed maximum number
of topics in namespace."));
} else {
return
CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3e5281b8f92..d6176966d85 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -156,6 +156,36 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
setupClusters();
}
+ @Test
+ public void testExceptionOfMaxTopicsPerNamespaceCanBeHanle() throws
Exception {
+ super.internalCleanup();
+ conf.setMaxTopicsPerNamespace(3);
+ super.internalSetup();
+ String topic = "persistent://testTenant/ns1/test_create_topic_v";
+ TenantInfoImpl tenantInfo = new
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+ Sets.newHashSet("test"));
+ // check producer/consumer auto create non-partitioned topic
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ admin.tenants().createTenant("testTenant", tenantInfo);
+ admin.namespaces().createNamespace("testTenant/ns1",
Sets.newHashSet("test"));
+
+ pulsarClient.newProducer().topic(topic + "1").create().close();
+ pulsarClient.newProducer().topic(topic + "2").create().close();
+ pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe().close();
+ try {
+ pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub")
+ .subscribeAsync().get(5, TimeUnit.SECONDS);
+ Assert.fail();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof
PulsarClientException.NotAllowedException);
+ }
+
+ // reset configuration
+ conf.setMaxTopicsPerNamespace(0);
+ conf.setDefaultNumPartitions(1);
+ }
+
@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();