This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 32f9662 Fixed checking for `maxTopicsPerNamespace` (#9121)
32f9662 is described below
commit 32f96625bb5178a529ea5db9a4ffcf2638c5441d
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jan 5 12:48:36 2021 -0800
Fixed checking for `maxTopicsPerNamespace` (#9121)
* Fixed checking for `maxTopicsPerNamespace`
* Fixed other mocked test issue
Co-authored-by: penghui <[email protected]>
---
.../org/apache/pulsar/broker/admin/AdminResource.java | 15 +++++++++++++--
.../apache/pulsar/broker/admin/PersistentTopicsTest.java | 2 ++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 53d6220..0d4f0dc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -811,9 +811,20 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse,
int numPartitions) {
- Integer maxTopicsPerNamespace;
+ Integer maxTopicsPerNamespace = null;
+
+ try {
+ Policies policies = getNamespacePolicies(namespaceName);
+ maxTopicsPerNamespace = policies.max_topics_per_namespace;
+ } catch (RestException e) {
+ if (e.getResponse().getStatus() !=
Status.NOT_FOUND.getStatusCode()) {
+ log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), namespaceName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+
try {
- maxTopicsPerNamespace =
getNamespacePolicies(namespaceName).max_topics_per_namespace;
if (maxTopicsPerNamespace == null) {
maxTopicsPerNamespace =
pulsar().getConfig().getMaxTopicsPerNamespace();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 8e490f1..2ebbbda 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
@@ -299,6 +300,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName1,
nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1,
nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
+ doReturn(new
Policies()).when(persistentTopics).getNamespacePolicies(any());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor =
ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant,
testNamespace, partitionedTopicName, 5);