This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2442071  support namespace policy for maxTopicsPerNamespace limit 
(#9042)
2442071 is described below

commit 2442071d36a4141101dd00c8666ffbdcbe3bbbe2
Author: hangc0276 <[email protected]>
AuthorDate: Tue Jan 5 01:42:09 2021 +0800

    support namespace policy for maxTopicsPerNamespace limit (#9042)
    
    Related to #8225
    
    #### Changes
    1. Support namespace policy for `maxTopicsPerNamespace` limit
    2. Add the tests for this feature
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  19 +--
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  23 +++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  34 ++++++
 .../pulsar/broker/service/BrokerService.java       |  25 ++--
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  16 +--
 .../apache/pulsar/broker/admin/NamespacesTest.java | 127 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 102 +++++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      |  79 +++++++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  43 +++++++
 .../pulsar/common/policies/data/Policies.java      |   2 +
 .../pulsar/common/policies/data/PolicyName.java    |   1 +
 11 files changed, 447 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 58c72e9..53d6220 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -811,9 +811,14 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions) {
-        final int maxTopicsPerNamespace = 
pulsar().getConfig().getMaxTopicsPerNamespace();
-        if (maxTopicsPerNamespace > 0) {
-            try {
+        Integer maxTopicsPerNamespace;
+        try {
+            maxTopicsPerNamespace = 
getNamespacePolicies(namespaceName).max_topics_per_namespace;
+            if (maxTopicsPerNamespace == null) {
+                maxTopicsPerNamespace = 
pulsar().getConfig().getMaxTopicsPerNamespace();
+            }
+
+            if (maxTopicsPerNamespace > 0) {
                 List<String> partitionedTopics = 
getTopicPartitionList(TopicDomain.persistent);
                 if (partitionedTopics.size() + numPartitions > 
maxTopicsPerNamespace) {
                     log.error("[{}] Failed to create partitioned topic {}, "
@@ -822,11 +827,11 @@ public abstract class AdminResource extends 
PulsarWebResource {
                             "Exceed maximum number of topics in namespace."));
                     return;
                 }
-            } catch (Exception e) {
-                log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
             }
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), namespaceName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
         }
 
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index e7f5d2e..20397b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -3175,5 +3175,26 @@ public abstract class NamespacesBase extends 
AdminResource {
         return policies.offload_policies;
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
+    protected int internalGetMaxTopicsPerNamespace() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.READ);
+        return getNamespacePolicies(namespaceName).max_topics_per_namespace;
+    }
+
+   protected void internalRemoveMaxTopicsPerNamespace() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.WRITE);
+        internalSetMaxTopicsPerNamespace(null);
+   }
+
+   protected void internalSetMaxTopicsPerNamespace(Integer 
maxTopicsPerNamespace) {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        if (maxTopicsPerNamespace != null && maxTopicsPerNamespace < 0) {
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "maxTopicsPerNamespace must be 0 or more");
+        }
+        internalSetPolicies("max_topics_per_namespace", maxTopicsPerNamespace);
+   }
+
+   private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 14460fc..781c37d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1488,5 +1488,39 @@ public class Namespaces extends NamespacesBase {
         return internalGetOffloadPolicies();
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/maxTopicsPerNamespace")
+    @ApiOperation(value = "Get maxTopicsPerNamespace config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or namespace does not 
exist") })
+    public Integer getMaxTopicsPerNamespace(@PathParam("tenant") String tenant,
+                                              @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetMaxTopicsPerNamespace();
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/maxTopicsPerNamespace")
+    @ApiOperation(value = "Set maxTopicsPerNamespace config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or namespace doesn't 
exist"), })
+    public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+                                         @PathParam("namespace") String 
namespace,
+                                         @ApiParam(value = "Number of maximum 
topics for specific namespace",
+                                                 required = true) int 
maxTopicsPerNamespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetMaxTopicsPerNamespace(maxTopicsPerNamespace);
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/maxTopicsPerNamespace")
+    @ApiOperation(value = "Set maxTopicsPerNamespace config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or namespace doesn't 
exist"), })
+    public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+                                         @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalRemoveMaxTopicsPerNamespace();
+    }
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a32d8b1..acab311 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2556,9 +2556,18 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int 
numPartitions,
                                             CompletableFuture<T> topicFuture) {
-        final int maxTopicsPerNamespace = 
pulsar().getConfig().getMaxTopicsPerNamespace();
-        if (maxTopicsPerNamespace > 0) {
-            try {
+        Integer maxTopicsPerNamespace;
+        try {
+            maxTopicsPerNamespace = 
pulsar.getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, 
topicName.getNamespace()))
+                    .map(p -> p.max_topics_per_namespace)
+                    .orElse(null);
+
+            if (maxTopicsPerNamespace == null) {
+                maxTopicsPerNamespace = 
pulsar.getConfig().getMaxTopicsPerNamespace();
+            }
+
+            if (maxTopicsPerNamespace > 0) {
                 String partitionedTopicPath = 
PulsarWebResource.joinPath(MANAGED_LEDGER_PATH_ZNODE,
                         topicName.getNamespace(), 
topicName.getDomain().value());
                 List<String> topics = 
pulsar().getGlobalZkCache().getZooKeeper()
@@ -2570,13 +2579,13 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                             "Exceed maximum number of topics in namespace."));
                     return false;
                 }
-            } catch (KeeperException.NoNodeException e) {
-                // NoNode means there are no partitioned topics in this domain 
for this namespace
-            } catch (Exception e) {
-                log.error("Failed to create partitioned topic {}", topicName, 
e);
+            }
+        } catch (KeeperException.NoNodeException e) {
+            // NoNode means there are no partitioned topics in this domain for 
this namespace
+        } catch (Exception e) {
+            log.error("Failed to create partitioned topic {}", topicName, e);
                 topicFuture.completeExceptionally(new RestException(e));
                 return false;
-            }
         }
 
         return true;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 58357a8..849881b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1440,11 +1440,11 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         admin.tenants().createTenant("testTenant", tenantInfo);
         admin.namespaces().createNamespace("testTenant/ns1", 
Sets.newHashSet("test"));
 
-        pulsarClient.newProducer().topic(topic + "1").create();
-        pulsarClient.newProducer().topic(topic + "2").create();
-        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe();
+        pulsarClient.newProducer().topic(topic + "1").create().close();
+        pulsarClient.newProducer().topic(topic + "2").create().close();
+        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe().close();
         try {
-            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub").subscribe();
+            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub").subscribe().close();
             Assert.fail();
         } catch (PulsarClientException e) {
             log.info("Exception: ", e);
@@ -1459,11 +1459,11 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         admin.tenants().createTenant("testTenant", tenantInfo);
         admin.namespaces().createNamespace("testTenant/ns1", 
Sets.newHashSet("test"));
 
-        pulsarClient.newProducer().topic(topic + "1").create();
-        pulsarClient.newProducer().topic(topic + "2").create();
-        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe();
+        pulsarClient.newProducer().topic(topic + "1").create().close();
+        pulsarClient.newProducer().topic(topic + "2").create().close();
+        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe().close();
         try {
-            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub").subscribe();
+            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub").subscribe().close();
             Assert.fail();
         } catch (PulsarClientException e) {
             log.info("Exception: ", e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index d51911a..d6f9a5c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.client.api.PulsarClientException;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -1413,6 +1414,132 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().deleteNamespace(namespace);
     }
 
+    @Test
+    public void testMaxTopicsPerNamespace() throws Exception {
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(15);
+        super.internalSetup();
+
+        String namespace = "testTenant/ns1";
+        admin.clusters().createCluster("use", new 
ClusterData(brokerUrl.toString()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"),
+                Sets.newHashSet("use"));
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
+        assertEquals(10, 
admin.namespaces().getMaxTopicsPerNamespace(namespace));
+
+        // check create partitioned/non-partitioned topics using namespace 
policy
+        String topic = "persistent://testTenant/ns1/test_create_topic_v";
+        admin.topics().createPartitionedTopic(topic + "1", 2);
+        admin.topics().createPartitionedTopic(topic + "2", 3);
+        admin.topics().createPartitionedTopic(topic + "3", 4);
+        admin.topics().createNonPartitionedTopic(topic + "4");
+
+        try {
+            admin.topics().createPartitionedTopic(topic + "5", 2);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getHttpError(), "Exceed maximum number of topics in 
namespace.");
+        }
+
+        // remove namespace policy limit, use broker configuration instead.
+        admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+        admin.topics().createPartitionedTopic(topic + "6", 4);
+        try {
+            admin.topics().createPartitionedTopic(topic + "7", 3);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getHttpError(), "Exceed maximum number of topics in 
namespace.");
+        }
+
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 0);
+        // set namespace policy to no limit
+        for (int i = 0; i< 10; ++i) {
+            admin.topics().createPartitionedTopic(topic + "_v" + i, 2);
+            admin.topics().createNonPartitionedTopic(topic + "_vn" + i);
+        }
+
+
+        // check producer/consumer auto create partitioned topic
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(0);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreationType("partitioned");
+        super.internalSetup();
+
+        admin.clusters().createCluster("use", new 
ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
+
+        pulsarClient.newProducer().topic(topic + "1").create().close();
+        pulsarClient.newProducer().topic(topic + "2").create().close();
+        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe().close();
+
+        try {
+            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub").subscribe().close();
+            fail();
+        } catch (PulsarClientException e) {
+            log.info("Exception: ", e);
+        }
+
+        // remove namespace limit
+        admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+        for (int i = 0; i < 10; ++i) {
+            pulsarClient.newProducer().topic(topic + "_p" + 
i).create().close();
+            pulsarClient.newConsumer().topic(topic + "_c" + 
i).subscriptionName("test_sub").subscribe().close();
+        }
+
+        // check producer/consumer auto create non-partitioned topic
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(0);
+        conf.setDefaultNumPartitions(1);
+        conf.setAllowAutoTopicCreationType("non-partitioned");
+        super.internalSetup();
+
+        admin.clusters().createCluster("use", new 
ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 3);
+
+        pulsarClient.newProducer().topic(topic + "1").create().close();
+        pulsarClient.newProducer().topic(topic + "2").create().close();
+        pulsarClient.newConsumer().topic(topic + 
"3").subscriptionName("test_sub").subscribe().close();
+
+        try {
+            pulsarClient.newConsumer().topic(topic + 
"4").subscriptionName("test_sub").subscribe().close();
+            fail();
+        } catch (PulsarClientException e) {
+            log.info("Exception: ", e);
+        }
+
+        // set namespace limit to 5
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 5);
+        pulsarClient.newProducer().topic(topic + "4").create().close();
+        pulsarClient.newProducer().topic(topic + "5").create().close();
+        try {
+            pulsarClient.newConsumer().topic(topic + 
"6").subscriptionName("test_sub").subscribe().close();
+            fail();
+        } catch (PulsarClientException e) {
+            log.info("Exception: ", e);
+        }
+
+        // remove namespace limit
+        admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+        for (int i = 0; i< 10; ++i) {
+            pulsarClient.newProducer().topic(topic + "_p" + 
i).create().close();
+            pulsarClient.newConsumer().topic(topic + "_c" + 
i).subscriptionName("test_sub").subscribe().close();
+        }
+
+        conf.setMaxTopicsPerNamespace(0);
+        conf.setDefaultNumPartitions(1);
+        conf.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
     private void assertInvalidRetentionPolicy(String namespace, int 
retentionTimeInMinutes, int retentionSizeInMB) {
         try {
             RetentionPolicies retention = new 
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index bf2ced1..708bd3d 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -3502,4 +3502,106 @@ public interface Namespaces {
      *            Namespace name
      */
     CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String 
namespace);
+
+    /**
+     * Get maxTopicsPerNamespace for a namespace.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     *     <code>0</code>
+     * </pre>
+     * @param namespace
+     *              Namespace name
+     * @return
+     * @throws NotAuthorizedException
+     *              Don't have admin permission
+     * @throws NotFoundException
+     *              Namespace dost not exist
+     * @throws PulsarAdminException
+     *              Unexpected error
+     */
+    int getMaxTopicsPerNamespace(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get maxTopicsPerNamespace for a namespace asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     *     <code>0</code>
+     * </pre>
+     * @param namespace
+     *          Namespace name
+     * @return
+     */
+    CompletableFuture<Integer> getMaxTopicsPerNamespaceAsync(String namespace);
+
+    /**
+     * Set maxTopicsPerNamespace for a namespace.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     *     <code>100</code>
+     * </pre>
+     *
+     * @param namespace
+     *              Namespace name
+     * @param maxTopicsPerNamespace
+     *              maxTopicsPerNamespace value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *              Don't have admin permission
+     * @throws NotFoundException
+     *              Namespace does not exist
+     * @throws PulsarAdminException
+     *              Unexpected error
+     */
+    void setMaxTopicsPerNamespace(String namespace, int maxTopicsPerNamespace) 
throws PulsarAdminException;
+
+    /**
+     * Set maxTopicsPerNamespace for a namespace asynchronously.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     *     <code>100</code>
+     * </pre>
+     *
+     * @param namespace
+     *              Namespace name
+     * @param maxTopicsPerNamespace
+     *              maxTopicsPerNamespace value for a namespace
+     * @return
+     */
+    CompletableFuture<Void> setMaxTopicsPerNamespaceAsync(String namespace, 
int maxTopicsPerNamespace);
+
+    /**
+     * remove maxTopicsPerNamespace for a namespace.
+     *
+     * @param namespace
+     *              Namespace name
+     * @throws NotAuthorizedException
+     *              Don't have admin permission
+     * @throws NotFoundException
+     *              Namespace does not exist
+     * @throws PulsarAdminException
+     *              Unexpected error
+     */
+    void removeMaxTopicsPerNamespace(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * remove maxTopicsPerNamespace for a namespace asynchronously.
+     *
+     * @param namespace
+     *              Namespace name
+     * @@throws NotAuthorizedException
+     *              Don't have admin permission
+     * @throws NotFoundException
+     *              Namespace does not exist
+     * @throws PulsarAdminException
+     *              Unexpected error
+     */
+    CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String namespace);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 4cf55ae..6da28c5 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2883,6 +2883,85 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         return future;
     }
 
+    @Override
+    public int getMaxTopicsPerNamespace(String namespace) throws 
PulsarAdminException {
+        try {
+            return getMaxTopicsPerNamespaceAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> getMaxTopicsPerNamespaceAsync(String 
namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer maxTopicsPerNamespace) {
+                        future.complete(maxTopicsPerNamespace);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setMaxTopicsPerNamespace(String namespace, int 
maxTopicsPerNamespace) throws PulsarAdminException {
+        try {
+            setMaxTopicsPerNamespaceAsync(namespace, maxTopicsPerNamespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxTopicsPerNamespaceAsync(String 
namespace, int maxTopicsPerNamespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
+        return asyncPostRequest(path, Entity.entity(maxTopicsPerNamespace, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeMaxTopicsPerNamespace(String namespace) throws 
PulsarAdminException {
+        try {
+            removeMaxTopicsPerNamespaceAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String 
namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
+        return asyncDeleteRequest(path);
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : 
adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d2052cd..d755571 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1822,6 +1822,45 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set max topics per namespace")
+    private class SetMaxTopicsPerNamespace extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--max-topics-per-namespace", "-t"}, description = 
"max topics per namespace", required = true)
+        private int maxTopicsPerNamespace;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxTopicsPerNamespace(namespace, 
maxTopicsPerNamespace);
+        }
+    }
+
+    @Parameters(commandDescription = "Get max topics per namespace")
+    private class GetMaxTopicsPerNamespace extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxTopicsPerNamespace(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max topics per namespace")
+    private class RemoveMaxTopicsPerNamespace extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -1956,5 +1995,9 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-deduplication-snapshot-interval", new 
SetDeduplicationSnapshotInterval());
         jcommander.addCommand("get-deduplication-snapshot-interval", new 
GetDeduplicationSnapshotInterval());
         jcommander.addCommand("remove-deduplication-snapshot-interval", new 
RemoveDeduplicationSnapshotInterval());
+
+        jcommander.addCommand("set-max-topics-per-namespace", new 
SetMaxTopicsPerNamespace());
+        jcommander.addCommand("get-max-topics-per-namespace", new 
GetMaxTopicsPerNamespace());
+        jcommander.addCommand("remove-max-topics-per-namespace", new 
RemoveMaxTopicsPerNamespace());
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 6a748d3..469e3a8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -97,6 +97,8 @@ public class Policies {
     public long offload_threshold = -1;
     @SuppressWarnings("checkstyle:MemberName")
     public Long offload_deletion_lag_ms = null;
+    @SuppressWarnings("checkstyle:MemberName")
+    public Integer max_topics_per_namespace = null;
 
     @SuppressWarnings("checkstyle:MemberName")
     @Deprecated
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index c8fa3d7..81cfb2a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -45,4 +45,5 @@ public enum PolicyName {
     SUBSCRIPTION_AUTH_MODE,
     ENCRYPTION,
     TTL,
+    MAX_TOPICS
 }

Reply via email to