This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c28378  Support topic-level max message size (#8732)
9c28378 is described below

commit 9c28378aea7da83164938c84fa6b55d0474fbbd8
Author: feynmanlin <[email protected]>
AuthorDate: Mon Nov 30 18:30:18 2020 +0800

    Support topic-level max message size (#8732)
    
    fix https://github.com/streamnative/pulsar/issues/1723
    
    ### Motivation
    The current policy to control the size of the message is at the broker 
level(maxMessageSize). It becomes easier to plan resource quotas for client 
allocation if the max message size pushed can be given at the topic level.
    
    ### Modifications
    
    Now the broker-level `maxMessageSize` is returned by the broker to the 
client, when the broker handles `handleConnected`. The client will cached 
`maxMessageSize` locally. An exception will be thrown if it exceeds the limit.
    
    Topic-level cannot be implemented like this, because:
    1) When `handleConnected`, the command received by the broker does not 
contain specific topic information, so it is not known which topic policy to 
return to the client.
    2) The client cannot cache topic-level policy. Unlike the broker-level 
policy, which will not change, the topic-level policy will change dynamically, 
which will involve cache consistency issues.
    
    I think the best way to handle this is to let the broker determine whether 
it exceeds the limit, and return an exception if it exceeds the limit, and 
handle the exception by the client's `handleSendError`.
    
    ### Verifying this change
    TopicPoliciesTest.java
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 29 +++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 84 +++++++++++++++++++
 .../pulsar/broker/service/AbstractTopic.java       | 15 ++++
 .../org/apache/pulsar/broker/service/Producer.java | 15 +++-
 .../service/nonpersistent/NonPersistentTopic.java  |  5 ++
 .../broker/service/persistent/PersistentTopic.java | 12 +++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 97 ++++++++++++++++++++--
 .../org/apache/pulsar/client/admin/Topics.java     | 51 ++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 76 +++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  7 ++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 43 ++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +
 .../apache/pulsar/client/impl/ProducerImpl.java    | 19 +++++
 .../pulsar/common/policies/data/TopicPolicies.java |  5 ++
 14 files changed, 452 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 18faede..c0cb9ef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2607,6 +2607,35 @@ public class PersistentTopicsBase extends AdminResource {
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
     }
 
+    protected CompletableFuture<Void> internalSetMaxMessageSize(Integer 
maxMessageSize) {
+        if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > 
config().getMaxMessageSize())) {
+            throw new RestException(Status.PRECONDITION_FAILED
+                    , "topic-level maxMessageSize must be greater than or 
equal to 0 " +
+                    "and must be smaller than that in the broker-level");
+        }
+
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+
+        TopicPolicies topicPolicies = 
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        topicPolicies.setMaxMessageSize(maxMessageSize);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+    }
+
+    protected Optional<Integer> internalGetMaxMessageSize() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return 
getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
+    }
+
     protected Optional<Integer> internalGetMaxProducers() {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 84f53a2..f329512 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1918,6 +1918,90 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
+    @ApiOperation(value = "Get maxMessageSize config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
+                                  @PathParam("tenant") String tenant,
+                                  @PathParam("namespace") String namespace,
+                                  @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Integer> policies = internalGetMaxMessageSize();
+            if (policies.isPresent()) {
+                asyncResponse.resume(policies.get());
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
+    @ApiOperation(value = "Set maxMessageSize config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Invalid value of 
maxConsumers")})
+    public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
+                                  @PathParam("tenant") String tenant,
+                                  @PathParam("namespace") String namespace,
+                                  @PathParam("topic") @Encoded String 
encodedTopic,
+                                  @ApiParam(value = "The max message size of 
the topic") int maxMessageSize) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed updated persistence policies", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed updated persistence policies", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully set max message size: 
namespace={}, topic={}, maxMessageSiz={}",
+                        clientAppId(),
+                        namespaceName,
+                        topicName.getLocalName(),
+                        maxMessageSize);
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
+    @ApiOperation(value = "Remove maxMessageSize config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeMaxMessageSize(@Suspended final AsyncResponse 
asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove maxMessageSize", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove max message size: 
namespace={}, topic={}",
+                        clientAppId(),
+                        namespaceName,
+                        topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
 
     @POST
     @Path("/{tenant}/{namespace}/{topic}/terminate")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fa741ed..341193e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -571,6 +571,21 @@ public abstract class AbstractTopic implements Topic {
         }
     }
 
+    protected boolean isExceedMaximumMessageSize(int size) {
+        Integer maxMessageSize = null;
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+        if (topicPolicies != null && topicPolicies.isMaxMessageSizeSet()) {
+            maxMessageSize = topicPolicies.getMaxMessageSize();
+        }
+        if (maxMessageSize != null) {
+            if (maxMessageSize == 0) {
+                return false;
+            }
+            return size > maxMessageSize;
+        }
+        return false;
+    }
+
     /**
      * update topic publish dispatcher for this topic.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index eb22442..012e7f7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -356,8 +356,7 @@ public class Producer {
         @Override
         public void completed(Exception exception, long ledgerId, long 
entryId) {
             if (exception != null) {
-                ServerError serverError = (exception instanceof 
TopicTerminatedException)
-                        ? ServerError.TopicTerminatedError : 
ServerError.PersistenceError;
+                final ServerError serverError = getServerError(exception);
 
                 producer.cnx.execute(() -> {
                     if (!(exception instanceof TopicClosedException)) {
@@ -383,6 +382,18 @@ public class Producer {
             }
         }
 
+        private ServerError getServerError(Exception exception) {
+            ServerError serverError;
+            if (exception instanceof TopicTerminatedException) {
+                serverError = ServerError.TopicTerminatedError;
+            } else if (exception instanceof 
BrokerServiceException.NotAllowedException) {
+                serverError = ServerError.NotAllowedError;
+            } else {
+                serverError = ServerError.PersistenceError;
+            }
+            return serverError;
+        }
+
         /**
          * Executed from I/O thread when sending receipt back to client
          */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4b72edc..551a9cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -160,6 +160,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
 
     @Override
     public void publishMessage(ByteBuf data, PublishContext callback) {
+        if (isExceedMaximumMessageSize(data.readableBytes())) {
+            callback.completed(new NotAllowedException("Exceed maximum message 
size")
+                    , -1, -1);
+            return;
+        }
         callback.completed(null, 0L, 0L);
         ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c686519..9a8f1dec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -339,6 +339,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             decrementPendingWriteOpsAndCheck();
             return;
         }
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+            publishContext.completed(new NotAllowedException("Exceed maximum 
message size")
+                    , -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
 
         MessageDeduplication.MessageDupStatus status = 
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
         switch (status) {
@@ -2454,6 +2460,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             decrementPendingWriteOpsAndCheck();
             return;
         }
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+            publishContext.completed(new NotAllowedException("Exceed maximum 
message size")
+                    , -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
 
         MessageDeduplication.MessageDupStatus status = 
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
         switch (status) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index e2165d2..8fe2be2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,32 +18,31 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
-import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
-import org.apache.pulsar.common.policies.data.SubscribeRate;
-import static org.testng.Assert.assertEquals;
-
 import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -55,6 +54,12 @@ import java.lang.reflect.Field;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 @Slf4j
 public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
@@ -91,6 +96,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
+        this.resetConfig();
     }
 
     @Test
@@ -1160,4 +1166,81 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
         Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);
     }
+
+    @Test(timeOut = 20000)
+    public void testTopicMaxMessageSizeApi() throws Exception{
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
+        assertNull(admin.topics().getMaxMessageSize(persistenceTopic));
+
+        admin.topics().setMaxMessageSize(persistenceTopic,10);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> 
pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic))
 != null);
+        
assertEquals(admin.topics().getMaxMessageSize(persistenceTopic).intValue(),10);
+
+        admin.topics().removeMaxMessageSize(persistenceTopic);
+        assertNull(admin.topics().getMaxMessageSize(persistenceTopic));
+
+        try {
+            
admin.topics().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(),412);
+        }
+        try {
+            admin.topics().setMaxMessageSize(persistenceTopic, -1);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(),412);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testTopicMaxMessageSize() throws Exception{
+        doTestTopicMaxMessageSize(true);
+        doTestTopicMaxMessageSize(false);
+    }
+
+    private void doTestTopicMaxMessageSize(boolean isPartitioned) throws 
Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        if (isPartitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+        // init cache
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        assertNull(admin.topics().getMaxMessageSize(topic));
+        // set msg size
+        admin.topics().setMaxMessageSize(topic, 10);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> 
pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != 
null);
+        assertEquals(admin.topics().getMaxMessageSize(topic).intValue(), 10);
+
+        try {
+            producer.send(new byte[1024]);
+        } catch (PulsarClientException e) {
+            assertTrue(e instanceof PulsarClientException.NotAllowedException);
+        }
+
+        admin.topics().removeMaxMessageSize(topic);
+        assertNull(admin.topics().getMaxMessageSize(topic));
+
+        try {
+            admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+        }
+        try {
+            admin.topics().setMaxMessageSize(topic, -1);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+        }
+
+        MessageId messageId = producer.send(new byte[1024]);
+        assertNotNull(messageId);
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0ae525f..4c8a8ffa 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2455,6 +2455,57 @@ public interface Topics {
      * @param topic Topic name
      */
     CompletableFuture<Void> removeMaxProducersAsync(String topic);
+    /**
+     * Get the max message size for specified topic.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    Integer getMaxMessageSize(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the max message size for specified topic asynchronously.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    CompletableFuture<Integer> getMaxMessageSizeAsync(String topic);
+
+
+    /**
+     * Set the max message size for specified topic.
+     *
+     * @param topic Topic name
+     * @param maxMessageSize Max message size of producer
+     * @throws PulsarAdminException Unexpected error
+     */
+    void setMaxMessageSize(String topic, int maxMessageSize) throws 
PulsarAdminException;
+
+    /**
+     * Set the max message size for specified topic asynchronously.0 disables.
+     *
+     * @param topic Topic name
+     * @param maxMessageSize Max message size of topic
+     * @throws PulsarAdminException Unexpected error
+     */
+    CompletableFuture<Void> setMaxMessageSizeAsync(String topic, int 
maxMessageSize);
+
+    /**
+     * Remove the max message size for specified topic.
+     *
+     * @param topic Topic name
+     * @throws PulsarAdminException Unexpected error
+     */
+    void removeMaxMessageSize(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove the max message size for specified topic asynchronously.
+     *
+     * @param topic Topic name
+     */
+    CompletableFuture<Void> removeMaxMessageSizeAsync(String topic);
 
     /**
      * Get the max number of consumer for specified topic.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0d4c66c..7c3e149 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2636,6 +2636,82 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public Integer getMaxMessageSize(String topic) throws PulsarAdminException 
{
+        try {
+            return getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> getMaxMessageSizeAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxMessageSize");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer maxMessageSize) {
+                        future.complete(maxMessageSize);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setMaxMessageSize(String topic, int maxMessageSize) throws 
PulsarAdminException {
+        try {
+            setMaxMessageSizeAsync(topic, 
maxMessageSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxMessageSizeAsync(String topic, int 
maxMessageSize) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxMessageSize");
+        return asyncPostRequest(path, Entity.entity(maxMessageSize, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeMaxMessageSize(String topic) throws PulsarAdminException 
{
+        try {
+            removeMaxMessageSizeAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxMessageSizeAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxMessageSize");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxConsumers(String topic) throws PulsarAdminException {
         try {
             return getMaxConsumersAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5453a86..927ee73 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -778,6 +778,13 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("set-max-unacked-messages-on-subscription 
persistent://myprop/clust/ns1/ds1 -m 99"));
         
verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1",
 99);
 
+        cmdTopics.run(split("get-max-message-size 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).getMaxMessageSize("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-max-message-size 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).removeMaxMessageSize("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-message-size 
persistent://myprop/clust/ns1/ds1 -m 99"));
+        
verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99);
+
         cmdTopics.run(split("get-deduplication-snapshot-interval 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("remove-deduplication-snapshot-interval 
persistent://myprop/clust/ns1/ds1"));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index c5ed51d..4519568 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -165,6 +165,10 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-maxProducers", new SetMaxProducers());
         jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
 
+        jcommander.addCommand("get-max-message-size", new GetMaxMessageSize());
+        jcommander.addCommand("set-max-message-size", new SetMaxMessageSize());
+        jcommander.addCommand("remove-max-message-size", new 
RemoveMaxMessageSize());
+
         jcommander.addCommand("get-max-consumers-per-subscription", new 
GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new 
SetMaxConsumersPerSubscription());
         jcommander.addCommand("remove-max-consumers-per-subscription", new 
RemoveMaxConsumersPerSubscription());
@@ -1692,6 +1696,45 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get max message size for a topic")
+    private class GetMaxMessageSize extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMaxMessageSize(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set max message size for a topic")
+    private class SetMaxMessageSize extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--max-message-size", "-m"}, description = "Max 
message size for a topic", required = true)
+        private int maxMessageSize;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMaxMessageSize(persistentTopic, maxMessageSize);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max message size for a topic")
+    private class RemoveMaxMessageSize extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMaxMessageSize(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get max consumers per subscription for a 
topic")
     private class GetMaxConsumersPerSubscription extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 8a008e8..42c3fac 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -605,6 +605,9 @@ public class ClientCnx extends PulsarHandler {
         case TopicTerminatedError:
             producers.get(producerId).terminated(this);
             break;
+        case NotAllowedError:
+            producers.get(producerId).recoverNotAllowedError(sequenceId);
+            break;
 
         default:
             // By default, for transient error, let the reconnection logic
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1efc0c9..3ab4207 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1039,6 +1039,25 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         resendMessages(cnx);
     }
 
+    protected synchronized void recoverNotAllowedError(long sequenceId) {
+        OpSendMsg op = pendingMessages.peek();
+        if(op != null && sequenceId == getHighestSequenceId(op)){
+            pendingMessages.remove();
+            releaseSemaphoreForSendOp(op);
+            try {
+                op.callback.sendComplete(
+                        new PulsarClientException.NotAllowedException(
+                                format("The size of the message which is 
produced by producer %s to the topic " +
+                                        "%s is not allowed", producerName, 
topic)));
+            } catch (Throwable t) {
+                log.warn("[{}] [{}] Got exception while completing the 
callback for msg {}:", topic,
+                        producerName, sequenceId, t);
+            }
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+        }
+    }
+
     /**
      * Computes checksum again and verifies it against existing checksum. If 
checksum doesn't match it means that
      * message is corrupt.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 47f0ec1..c1b4c50 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -55,6 +55,11 @@ public class TopicPolicies {
     private PublishRate publishRate = null;
     private SubscribeRate subscribeRate = null;
     private Integer deduplicationSnapshotIntervalSeconds = null;
+    private Integer maxMessageSize = null;
+
+    public boolean isMaxMessageSizeSet() {
+        return maxMessageSize != null;
+    }
 
     public boolean isDeduplicationSnapshotIntervalSecondsSet(){
         return deduplicationSnapshotIntervalSeconds != null;

Reply via email to