This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2442071 support namespace policy for maxTopicsPerNamespace limit
(#9042)
2442071 is described below
commit 2442071d36a4141101dd00c8666ffbdcbe3bbbe2
Author: hangc0276 <[email protected]>
AuthorDate: Tue Jan 5 01:42:09 2021 +0800
support namespace policy for maxTopicsPerNamespace limit (#9042)
Related to #8225
#### Changes
1. Support namespace policy for `maxTopicsPerNamespace` limit
2. Add the tests for this feature
---
.../apache/pulsar/broker/admin/AdminResource.java | 19 +--
.../pulsar/broker/admin/impl/NamespacesBase.java | 23 +++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 34 ++++++
.../pulsar/broker/service/BrokerService.java | 25 ++--
.../apache/pulsar/broker/admin/AdminApiTest2.java | 16 +--
.../apache/pulsar/broker/admin/NamespacesTest.java | 127 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 102 +++++++++++++++++
.../client/admin/internal/NamespacesImpl.java | 79 +++++++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 43 +++++++
.../pulsar/common/policies/data/Policies.java | 2 +
.../pulsar/common/policies/data/PolicyName.java | 1 +
11 files changed, 447 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 58c72e9..53d6220 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -811,9 +811,14 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse,
int numPartitions) {
- final int maxTopicsPerNamespace =
pulsar().getConfig().getMaxTopicsPerNamespace();
- if (maxTopicsPerNamespace > 0) {
- try {
+ Integer maxTopicsPerNamespace;
+ try {
+ maxTopicsPerNamespace =
getNamespacePolicies(namespaceName).max_topics_per_namespace;
+ if (maxTopicsPerNamespace == null) {
+ maxTopicsPerNamespace =
pulsar().getConfig().getMaxTopicsPerNamespace();
+ }
+
+ if (maxTopicsPerNamespace > 0) {
List<String> partitionedTopics =
getTopicPartitionList(TopicDomain.persistent);
if (partitionedTopics.size() + numPartitions >
maxTopicsPerNamespace) {
log.error("[{}] Failed to create partitioned topic {}, "
@@ -822,11 +827,11 @@ public abstract class AdminResource extends
PulsarWebResource {
"Exceed maximum number of topics in namespace."));
return;
}
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
}
+ } catch (Exception e) {
+ log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), namespaceName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
}
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index e7f5d2e..20397b8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -3175,5 +3175,26 @@ public abstract class NamespacesBase extends
AdminResource {
return policies.offload_policies;
}
- private static final Logger log =
LoggerFactory.getLogger(NamespacesBase.class);
+ protected int internalGetMaxTopicsPerNamespace() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS,
PolicyOperation.READ);
+ return getNamespacePolicies(namespaceName).max_topics_per_namespace;
+ }
+
+ protected void internalRemoveMaxTopicsPerNamespace() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS,
PolicyOperation.WRITE);
+ internalSetMaxTopicsPerNamespace(null);
+ }
+
+ protected void internalSetMaxTopicsPerNamespace(Integer
maxTopicsPerNamespace) {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ if (maxTopicsPerNamespace != null && maxTopicsPerNamespace < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "maxTopicsPerNamespace must be 0 or more");
+ }
+ internalSetPolicies("max_topics_per_namespace", maxTopicsPerNamespace);
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(NamespacesBase.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 14460fc..781c37d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1488,5 +1488,39 @@ public class Namespaces extends NamespacesBase {
return internalGetOffloadPolicies();
}
+ @GET
+ @Path("/{tenant}/{namespace}/maxTopicsPerNamespace")
+ @ApiOperation(value = "Get maxTopicsPerNamespace config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace does not
exist") })
+ public Integer getMaxTopicsPerNamespace(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetMaxTopicsPerNamespace();
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/maxTopicsPerNamespace")
+ @ApiOperation(value = "Set maxTopicsPerNamespace config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace doesn't
exist"), })
+ public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace,
+ @ApiParam(value = "Number of maximum
topics for specific namespace",
+ required = true) int
maxTopicsPerNamespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetMaxTopicsPerNamespace(maxTopicsPerNamespace);
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/maxTopicsPerNamespace")
+ @ApiOperation(value = "Set maxTopicsPerNamespace config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace doesn't
exist"), })
+ public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalRemoveMaxTopicsPerNamespace();
+ }
private static final Logger log =
LoggerFactory.getLogger(Namespaces.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a32d8b1..acab311 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2556,9 +2556,18 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int
numPartitions,
CompletableFuture<T> topicFuture) {
- final int maxTopicsPerNamespace =
pulsar().getConfig().getMaxTopicsPerNamespace();
- if (maxTopicsPerNamespace > 0) {
- try {
+ Integer maxTopicsPerNamespace;
+ try {
+ maxTopicsPerNamespace =
pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES,
topicName.getNamespace()))
+ .map(p -> p.max_topics_per_namespace)
+ .orElse(null);
+
+ if (maxTopicsPerNamespace == null) {
+ maxTopicsPerNamespace =
pulsar.getConfig().getMaxTopicsPerNamespace();
+ }
+
+ if (maxTopicsPerNamespace > 0) {
String partitionedTopicPath =
PulsarWebResource.joinPath(MANAGED_LEDGER_PATH_ZNODE,
topicName.getNamespace(),
topicName.getDomain().value());
List<String> topics =
pulsar().getGlobalZkCache().getZooKeeper()
@@ -2570,13 +2579,13 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
"Exceed maximum number of topics in namespace."));
return false;
}
- } catch (KeeperException.NoNodeException e) {
- // NoNode means there are no partitioned topics in this domain
for this namespace
- } catch (Exception e) {
- log.error("Failed to create partitioned topic {}", topicName,
e);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // NoNode means there are no partitioned topics in this domain for
this namespace
+ } catch (Exception e) {
+ log.error("Failed to create partitioned topic {}", topicName, e);
topicFuture.completeExceptionally(new RestException(e));
return false;
- }
}
return true;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 58357a8..849881b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1440,11 +1440,11 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1",
Sets.newHashSet("test"));
- pulsarClient.newProducer().topic(topic + "1").create();
- pulsarClient.newProducer().topic(topic + "2").create();
- pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe();
+ pulsarClient.newProducer().topic(topic + "1").create().close();
+ pulsarClient.newProducer().topic(topic + "2").create().close();
+ pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe().close();
try {
- pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub").subscribe();
+ pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub").subscribe().close();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Exception: ", e);
@@ -1459,11 +1459,11 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1",
Sets.newHashSet("test"));
- pulsarClient.newProducer().topic(topic + "1").create();
- pulsarClient.newProducer().topic(topic + "2").create();
- pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe();
+ pulsarClient.newProducer().topic(topic + "1").create().close();
+ pulsarClient.newProducer().topic(topic + "2").create().close();
+ pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe().close();
try {
- pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub").subscribe();
+ pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub").subscribe().close();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Exception: ", e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index d51911a..d6f9a5c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.client.api.PulsarClientException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -1413,6 +1414,132 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().deleteNamespace(namespace);
}
+ @Test
+ public void testMaxTopicsPerNamespace() throws Exception {
+ super.internalCleanup();
+ conf.setMaxTopicsPerNamespace(15);
+ super.internalSetup();
+
+ String namespace = "testTenant/ns1";
+ admin.clusters().createCluster("use", new
ClusterData(brokerUrl.toString()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"),
+ Sets.newHashSet("use"));
+ admin.tenants().createTenant("testTenant", tenantInfo);
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+
+ admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
+ assertEquals(10,
admin.namespaces().getMaxTopicsPerNamespace(namespace));
+
+ // check create partitioned/non-partitioned topics using namespace
policy
+ String topic = "persistent://testTenant/ns1/test_create_topic_v";
+ admin.topics().createPartitionedTopic(topic + "1", 2);
+ admin.topics().createPartitionedTopic(topic + "2", 3);
+ admin.topics().createPartitionedTopic(topic + "3", 4);
+ admin.topics().createNonPartitionedTopic(topic + "4");
+
+ try {
+ admin.topics().createPartitionedTopic(topic + "5", 2);
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 412);
+ assertEquals(e.getHttpError(), "Exceed maximum number of topics in
namespace.");
+ }
+
+ // remove namespace policy limit, use broker configuration instead.
+ admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+ admin.topics().createPartitionedTopic(topic + "6", 4);
+ try {
+ admin.topics().createPartitionedTopic(topic + "7", 3);
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 412);
+ assertEquals(e.getHttpError(), "Exceed maximum number of topics in
namespace.");
+ }
+
+ admin.namespaces().setMaxTopicsPerNamespace(namespace, 0);
+ // set namespace policy to no limit
+ for (int i = 0; i< 10; ++i) {
+ admin.topics().createPartitionedTopic(topic + "_v" + i, 2);
+ admin.topics().createNonPartitionedTopic(topic + "_vn" + i);
+ }
+
+
+ // check producer/consumer auto create partitioned topic
+ super.internalCleanup();
+ conf.setMaxTopicsPerNamespace(0);
+ conf.setDefaultNumPartitions(3);
+ conf.setAllowAutoTopicCreationType("partitioned");
+ super.internalSetup();
+
+ admin.clusters().createCluster("use", new
ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("testTenant", tenantInfo);
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+ admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
+
+ pulsarClient.newProducer().topic(topic + "1").create().close();
+ pulsarClient.newProducer().topic(topic + "2").create().close();
+ pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe().close();
+
+ try {
+ pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub").subscribe().close();
+ fail();
+ } catch (PulsarClientException e) {
+ log.info("Exception: ", e);
+ }
+
+ // remove namespace limit
+ admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+ for (int i = 0; i < 10; ++i) {
+ pulsarClient.newProducer().topic(topic + "_p" +
i).create().close();
+ pulsarClient.newConsumer().topic(topic + "_c" +
i).subscriptionName("test_sub").subscribe().close();
+ }
+
+ // check producer/consumer auto create non-partitioned topic
+ super.internalCleanup();
+ conf.setMaxTopicsPerNamespace(0);
+ conf.setDefaultNumPartitions(1);
+ conf.setAllowAutoTopicCreationType("non-partitioned");
+ super.internalSetup();
+
+ admin.clusters().createCluster("use", new
ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("testTenant", tenantInfo);
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+ admin.namespaces().setMaxTopicsPerNamespace(namespace, 3);
+
+ pulsarClient.newProducer().topic(topic + "1").create().close();
+ pulsarClient.newProducer().topic(topic + "2").create().close();
+ pulsarClient.newConsumer().topic(topic +
"3").subscriptionName("test_sub").subscribe().close();
+
+ try {
+ pulsarClient.newConsumer().topic(topic +
"4").subscriptionName("test_sub").subscribe().close();
+ fail();
+ } catch (PulsarClientException e) {
+ log.info("Exception: ", e);
+ }
+
+ // set namespace limit to 5
+ admin.namespaces().setMaxTopicsPerNamespace(namespace, 5);
+ pulsarClient.newProducer().topic(topic + "4").create().close();
+ pulsarClient.newProducer().topic(topic + "5").create().close();
+ try {
+ pulsarClient.newConsumer().topic(topic +
"6").subscriptionName("test_sub").subscribe().close();
+ fail();
+ } catch (PulsarClientException e) {
+ log.info("Exception: ", e);
+ }
+
+ // remove namespace limit
+ admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+ for (int i = 0; i< 10; ++i) {
+ pulsarClient.newProducer().topic(topic + "_p" +
i).create().close();
+ pulsarClient.newConsumer().topic(topic + "_c" +
i).subscriptionName("test_sub").subscribe().close();
+ }
+
+ conf.setMaxTopicsPerNamespace(0);
+ conf.setDefaultNumPartitions(1);
+ conf.setAllowAutoTopicCreationType("non-partitioned");
+ }
+
private void assertInvalidRetentionPolicy(String namespace, int
retentionTimeInMinutes, int retentionSizeInMB) {
try {
RetentionPolicies retention = new
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index bf2ced1..708bd3d 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -3502,4 +3502,106 @@ public interface Namespaces {
* Namespace name
*/
CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String
namespace);
+
+ /**
+ * Get maxTopicsPerNamespace for a namespace.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ * @param namespace
+ * Namespace name
+ * @return
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace dost not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ int getMaxTopicsPerNamespace(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get maxTopicsPerNamespace for a namespace asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ * @param namespace
+ * Namespace name
+ * @return
+ */
+ CompletableFuture<Integer> getMaxTopicsPerNamespaceAsync(String namespace);
+
+ /**
+ * Set maxTopicsPerNamespace for a namespace.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>100</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxTopicsPerNamespace
+ * maxTopicsPerNamespace value for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMaxTopicsPerNamespace(String namespace, int maxTopicsPerNamespace)
throws PulsarAdminException;
+
+ /**
+ * Set maxTopicsPerNamespace for a namespace asynchronously.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>100</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxTopicsPerNamespace
+ * maxTopicsPerNamespace value for a namespace
+ * @return
+ */
+ CompletableFuture<Void> setMaxTopicsPerNamespaceAsync(String namespace,
int maxTopicsPerNamespace);
+
+ /**
+ * remove maxTopicsPerNamespace for a namespace.
+ *
+ * @param namespace
+ * Namespace name
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeMaxTopicsPerNamespace(String namespace) throws
PulsarAdminException;
+
+ /**
+ * remove maxTopicsPerNamespace for a namespace asynchronously.
+ *
+ * @param namespace
+ * Namespace name
+ * @@throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String namespace);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 4cf55ae..6da28c5 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2883,6 +2883,85 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
return future;
}
+ @Override
+ public int getMaxTopicsPerNamespace(String namespace) throws
PulsarAdminException {
+ try {
+ return getMaxTopicsPerNamespaceAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Integer> getMaxTopicsPerNamespaceAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer maxTopicsPerNamespace) {
+ future.complete(maxTopicsPerNamespace);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setMaxTopicsPerNamespace(String namespace, int
maxTopicsPerNamespace) throws PulsarAdminException {
+ try {
+ setMaxTopicsPerNamespaceAsync(namespace, maxTopicsPerNamespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setMaxTopicsPerNamespaceAsync(String
namespace, int maxTopicsPerNamespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
+ return asyncPostRequest(path, Entity.entity(maxTopicsPerNamespace,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeMaxTopicsPerNamespace(String namespace) throws
PulsarAdminException {
+ try {
+ removeMaxTopicsPerNamespaceAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
+ return asyncDeleteRequest(path);
+ }
+
private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Namespaces :
adminNamespaces;
WebTarget namespacePath = base.path(namespace.toString());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d2052cd..d755571 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1822,6 +1822,45 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Set max topics per namespace")
+ private class SetMaxTopicsPerNamespace extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--max-topics-per-namespace", "-t"}, description =
"max topics per namespace", required = true)
+ private int maxTopicsPerNamespace;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setMaxTopicsPerNamespace(namespace,
maxTopicsPerNamespace);
+ }
+ }
+
+ @Parameters(commandDescription = "Get max topics per namespace")
+ private class GetMaxTopicsPerNamespace extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getMaxTopicsPerNamespace(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max topics per namespace")
+ private class RemoveMaxTopicsPerNamespace extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+ }
+ }
+
public CmdNamespaces(PulsarAdmin admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -1956,5 +1995,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-deduplication-snapshot-interval", new
SetDeduplicationSnapshotInterval());
jcommander.addCommand("get-deduplication-snapshot-interval", new
GetDeduplicationSnapshotInterval());
jcommander.addCommand("remove-deduplication-snapshot-interval", new
RemoveDeduplicationSnapshotInterval());
+
+ jcommander.addCommand("set-max-topics-per-namespace", new
SetMaxTopicsPerNamespace());
+ jcommander.addCommand("get-max-topics-per-namespace", new
GetMaxTopicsPerNamespace());
+ jcommander.addCommand("remove-max-topics-per-namespace", new
RemoveMaxTopicsPerNamespace());
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 6a748d3..469e3a8 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -97,6 +97,8 @@ public class Policies {
public long offload_threshold = -1;
@SuppressWarnings("checkstyle:MemberName")
public Long offload_deletion_lag_ms = null;
+ @SuppressWarnings("checkstyle:MemberName")
+ public Integer max_topics_per_namespace = null;
@SuppressWarnings("checkstyle:MemberName")
@Deprecated
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index c8fa3d7..81cfb2a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -45,4 +45,5 @@ public enum PolicyName {
SUBSCRIPTION_AUTH_MODE,
ENCRYPTION,
TTL,
+ MAX_TOPICS
}