This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66688d1 support message publish rate on topic level (#7948)
66688d1 is described below
commit 66688d15801119b7c62efa504e8267564acbe403
Author: hangc0276 <[email protected]>
AuthorDate: Wed Sep 2 21:11:51 2020 +0800
support message publish rate on topic level (#7948)
Modifications
Support set publish rate on topic level.
Support get publish rate on topic level.
Support remove publish rate on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 65 +++++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 88 +++++++++
.../pulsar/broker/service/AbstractTopic.java | 67 ++++++-
.../broker/service/PrecisPublishLimiter.java | 101 +++++++++++
.../pulsar/broker/service/PublishRateLimiter.java | 198 ---------------------
.../broker/service/PublishRateLimiterDisable.java | 65 +++++++
.../broker/service/PublishRateLimiterImpl.java | 104 +++++++++++
.../broker/service/persistent/PersistentTopic.java | 52 ++++--
.../broker/admin/TopicPoliciesDisableTest.java | 21 +++
.../pulsar/broker/admin/TopicPoliciesTest.java | 39 ++++
.../org/apache/pulsar/client/admin/Topics.java | 62 ++++++-
.../pulsar/client/admin/internal/TopicsImpl.java | 79 +++++++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 49 +++++
.../pulsar/common/policies/data/TopicPolicies.java | 5 +
14 files changed, 757 insertions(+), 238 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a78ead7..a25bf37 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,9 +97,8 @@ import
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.protocol.Commands;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
@@ -3128,18 +3127,60 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
- if (topicName.isGlobal()) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
- }
- checkTopicLevelPolicyEnable();
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setCompactionThreshold(null);
- return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
+ }
+ topicPolicies.get().setCompactionThreshold(null);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
+ }
+
+ protected Optional<PublishRate> internalGetPublishRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
+
+ }
+
+ protected CompletableFuture<Void> internalSetPublishRate(PublishRate
publishRate) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ if (publishRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setPublishRate(publishRate);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemovePublishRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setPublishRate(null);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies.get());
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c98867a..639a79e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -61,6 +61,7 @@ import
org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -1871,5 +1872,92 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/publishRate")
+ @ApiOperation(value = "Get publish rate configuration for specified
topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<PublishRate> publishRate = internalGetPublishRate();
+ if (!publishRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(publishRate.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/publishRate")
+ @ApiOperation(value = "Set message publish rate configuration for
specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not
exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @ApiParam(value = "Dispatch rate for the
specified topic") PublishRate publishRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic dispatch rate", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic dispatch rate");
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic publish rate:
tenant={}, namespace={}, topic={}, publishRate={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(publishRate));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/publishRate")
+ @ApiOperation(value = "Remove message publish rate configuration for
specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not
exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled,
please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String
encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemovePublishRate().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic publish rate", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic publish rate:
tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentTopics.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0d56ca0..bd4ad24 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -24,6 +24,7 @@ import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -44,6 +46,7 @@ import
org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
@@ -94,6 +97,8 @@ public abstract class AbstractTopic implements Topic {
protected boolean preciseTopicPublishRateLimitingEnable;
+ protected volatile PublishRate topicPolicyPublishRate = null;
+
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
@@ -110,17 +115,28 @@ public abstract class AbstractTopic implements Topic {
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
- Policies policies = null;
- try {
- policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
+ ServiceConfiguration brokerConfig =
brokerService.pulsar().getConfiguration();
+ if (brokerConfig.isSystemTopicEnabled() &&
brokerConfig.isSystemTopicEnabled()) {
+ topicPolicyPublishRate =
Optional.ofNullable(getTopicPolicies(TopicName.get(topic)))
+ .map(TopicPolicies::getPublishRate)
+ .orElse(null);
+ }
+ if (topicPolicyPublishRate != null) {
+ // update topic level publish dispatcher
+ updateTopicPublishDispatcher();
+ } else {
+ Policies policies = null;
+ try {
+ policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
- } catch (Exception e) {
- log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
- }
- this.preciseTopicPublishRateLimitingEnable =
+ } catch (Exception e) {
+ log.warn("[{}] Error getting policies {} and publish
throttling will be disabled", topic, e.getMessage());
+ }
+ this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
- updatePublishDispatcher(policies);
+ updatePublishDispatcher(policies);
+ }
}
protected boolean isProducersExceeded() {
@@ -466,6 +482,12 @@ public abstract class AbstractTopic implements Topic {
}
private void updatePublishDispatcher(Policies policies) {
+ // if topic level publish rate policy is set, skip update publish rate
on namespace level
+ if (topicPolicyPublishRate != null) {
+ log.info("Using topic policy publish rate instead of namespace
level topic publish rate on topic {}", this.topic);
+ return;
+ }
+
final String clusterName =
brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null &&
policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
@@ -532,4 +554,33 @@ public abstract class AbstractTopic implements Topic {
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
+
+ /**
+ * Get {@link TopicPolicies} for this topic.
+ * @param topicName
+ * @return TopicPolicies is exist else return null.
+ */
+ public TopicPolicies getTopicPolicies(TopicName topicName) {
+ TopicName cloneTopicName = topicName;
+ if (topicName.isPartitioned()) {
+ cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
+ }
+ try {
+ return
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.",
topicName.getPartitionedTopicName());
+ return null;
+ } catch (NullPointerException e) {
+ log.warn("Topic level policies are not enabled. " +
+ "Please refer to systemTopicEnabled and
topicLevelPoliciesEnabled on broker.conf");
+ return null;
+ }
+ }
+
+ /**
+ * update topic publish dispatcher for this topic.
+ */
+ protected void updateTopicPublishDispatcher() {
+ // noop
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
new file mode 100644
index 0000000..a555104
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.util.RateLimitFunction;
+import org.apache.pulsar.common.util.RateLimiter;
+
+import java.util.concurrent.TimeUnit;
+
+public class PrecisPublishLimiter implements PublishRateLimiter {
+ protected volatile int publishMaxMessageRate = 0;
+ protected volatile long publishMaxByteRate = 0;
+ protected volatile boolean publishThrottlingEnabled = false;
+ // precise mode for publish rate limiter
+ private RateLimiter topicPublishRateLimiterOnMessage;
+ private RateLimiter topicPublishRateLimiterOnByte;
+ private final RateLimitFunction rateLimitFunction;
+
+ public PrecisPublishLimiter(Policies policies, String clusterName,
RateLimitFunction rateLimitFunction) {
+ this.rateLimitFunction = rateLimitFunction;
+ update(policies, clusterName);
+ }
+
+ public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction
rateLimitFunction) {
+ this.rateLimitFunction = rateLimitFunction;
+ update(publishRate);
+ }
+
+ @Override
+ public void checkPublishRate() {
+ // No-op
+ }
+
+ @Override
+ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+ // No-op
+ }
+
+ @Override
+ public boolean resetPublishCount() {
+ return true;
+ }
+
+ @Override
+ public boolean isPublishRateExceeded() {
+ return false;
+ }
+
+
+ @Override
+ public void update(Policies policies, String clusterName) {
+ final PublishRate maxPublishRate = policies.publishMaxMessageRate !=
null
+ ? policies.publishMaxMessageRate.get(clusterName)
+ : null;
+ this.update(maxPublishRate);
+ }
+ public void update(PublishRate maxPublishRate) {
+ if (maxPublishRate != null
+ && (maxPublishRate.publishThrottlingRateInMsg > 0 ||
maxPublishRate.publishThrottlingRateInByte > 0)) {
+ this.publishThrottlingEnabled = true;
+ this.publishMaxMessageRate =
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
+ this.publishMaxByteRate =
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
+ if (this.publishMaxMessageRate > 0) {
+ topicPublishRateLimiterOnMessage = new
RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
+ }
+ if (this.publishMaxByteRate > 0) {
+ topicPublishRateLimiterOnByte = new
RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
+ }
+ } else {
+ this.publishMaxMessageRate = 0;
+ this.publishMaxByteRate = 0;
+ this.publishThrottlingEnabled = false;
+ topicPublishRateLimiterOnMessage = null;
+ topicPublishRateLimiterOnByte = null;
+ }
+ }
+
+ @Override
+ public boolean tryAcquire(int numbers, long bytes) {
+ return (topicPublishRateLimiterOnMessage == null ||
topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
+ (topicPublishRateLimiterOnByte == null ||
topicPublishRateLimiterOnByte.tryAcquire(bytes));
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
index 08d7e5b..67e6c77 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
@@ -18,13 +18,8 @@
*/
package org.apache.pulsar.broker.service;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
-
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
-import org.apache.pulsar.common.util.RateLimitFunction;
-import org.apache.pulsar.common.util.RateLimiter;
public interface PublishRateLimiter {
@@ -76,196 +71,3 @@ public interface PublishRateLimiter {
* */
boolean tryAcquire(int numbers, long bytes);
}
-
-class PrecisPublishLimiter implements PublishRateLimiter {
- protected volatile int publishMaxMessageRate = 0;
- protected volatile long publishMaxByteRate = 0;
- protected volatile boolean publishThrottlingEnabled = false;
- // precise mode for publish rate limiter
- private RateLimiter topicPublishRateLimiterOnMessage;
- private RateLimiter topicPublishRateLimiterOnByte;
- private final RateLimitFunction rateLimitFunction;
-
- public PrecisPublishLimiter(Policies policies, String clusterName,
RateLimitFunction rateLimitFunction) {
- this.rateLimitFunction = rateLimitFunction;
- update(policies, clusterName);
- }
-
- @Override
- public void checkPublishRate() {
- // No-op
- }
-
- @Override
- public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
- // No-op
- }
-
- @Override
- public boolean resetPublishCount() {
- return true;
- }
-
- @Override
- public boolean isPublishRateExceeded() {
- return false;
- }
-
-
- @Override
- public void update(Policies policies, String clusterName) {
- final PublishRate maxPublishRate = policies.publishMaxMessageRate !=
null
- ? policies.publishMaxMessageRate.get(clusterName)
- : null;
- this.update(maxPublishRate);
- }
- public void update(PublishRate maxPublishRate) {
- if (maxPublishRate != null
- && (maxPublishRate.publishThrottlingRateInMsg > 0 ||
maxPublishRate.publishThrottlingRateInByte > 0)) {
- this.publishThrottlingEnabled = true;
- this.publishMaxMessageRate =
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
- this.publishMaxByteRate =
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
- if (this.publishMaxMessageRate > 0) {
- topicPublishRateLimiterOnMessage = new
RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
- }
- if (this.publishMaxByteRate > 0) {
- topicPublishRateLimiterOnByte = new
RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
- }
- } else {
- this.publishMaxMessageRate = 0;
- this.publishMaxByteRate = 0;
- this.publishThrottlingEnabled = false;
- topicPublishRateLimiterOnMessage = null;
- topicPublishRateLimiterOnByte = null;
- }
- }
-
- @Override
- public boolean tryAcquire(int numbers, long bytes) {
- return (topicPublishRateLimiterOnMessage == null ||
topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
- (topicPublishRateLimiterOnByte == null ||
topicPublishRateLimiterOnByte.tryAcquire(bytes));
- }
-}
-
-class PublishRateLimiterImpl implements PublishRateLimiter {
- protected volatile int publishMaxMessageRate = 0;
- protected volatile long publishMaxByteRate = 0;
- protected volatile boolean publishThrottlingEnabled = false;
- protected volatile boolean publishRateExceeded = false;
- protected volatile LongAdder currentPublishMsgCount = new LongAdder();
- protected volatile LongAdder currentPublishByteCount = new LongAdder();
-
- public PublishRateLimiterImpl(Policies policies, String clusterName) {
- update(policies, clusterName);
- }
-
- public PublishRateLimiterImpl(PublishRate maxPublishRate) {
- update(maxPublishRate);
- }
-
- @Override
- public void checkPublishRate() {
- if (this.publishThrottlingEnabled && !publishRateExceeded) {
- long currentPublishMsgRate = this.currentPublishMsgCount.sum();
- long currentPublishByteRate = this.currentPublishByteCount.sum();
- if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate >
this.publishMaxMessageRate)
- || (this.publishMaxByteRate > 0 && currentPublishByteRate
> this.publishMaxByteRate)) {
- publishRateExceeded = true;
- }
- }
- }
-
- @Override
- public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
- if (this.publishThrottlingEnabled) {
- this.currentPublishMsgCount.add(numOfMessages);
- this.currentPublishByteCount.add(msgSizeInBytes);
- }
- }
-
- @Override
- public boolean resetPublishCount() {
- if (this.publishThrottlingEnabled) {
- this.currentPublishMsgCount.reset();
- this.currentPublishByteCount.reset();
- this.publishRateExceeded = false;
- return true;
- }
- return false;
- }
-
- @Override
- public boolean isPublishRateExceeded() {
- return publishRateExceeded;
- }
-
- @Override
- public void update(Policies policies, String clusterName) {
- final PublishRate maxPublishRate = policies.publishMaxMessageRate !=
null
- ? policies.publishMaxMessageRate.get(clusterName)
- : null;
- update(maxPublishRate);
- }
-
- public void update(PublishRate maxPublishRate) {
- if (maxPublishRate != null
- && (maxPublishRate.publishThrottlingRateInMsg > 0 ||
maxPublishRate.publishThrottlingRateInByte > 0)) {
- this.publishThrottlingEnabled = true;
- this.publishMaxMessageRate =
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
- this.publishMaxByteRate =
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
- } else {
- this.publishMaxMessageRate = 0;
- this.publishMaxByteRate = 0;
- this.publishThrottlingEnabled = false;
- }
- resetPublishCount();
- }
-
- @Override
- public boolean tryAcquire(int numbers, long bytes) {
- return false;
- }
-}
-
-class PublishRateLimiterDisable implements PublishRateLimiter {
-
- public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new
PublishRateLimiterDisable();
-
- @Override
- public void checkPublishRate() {
- // No-op
- }
-
- @Override
- public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
- // No-op
- }
-
- @Override
- public boolean resetPublishCount() {
- // No-op
- return false;
- }
-
- @Override
- public boolean isPublishRateExceeded() {
- return false;
- }
-
- @Override
- public void update(Policies policies, String clusterName) {
- // No-op
- }
-
- @Override
- public void update(PublishRate maxPublishRate) {
- // No-op
- }
-
- @Override
- public boolean tryAcquire(int numbers, long bytes) {
- // No-op
- return false;
- }
-
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
new file mode 100644
index 0000000..0ff3866
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+
+public class PublishRateLimiterDisable implements PublishRateLimiter {
+
+ public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new
PublishRateLimiterDisable();
+
+ @Override
+ public void checkPublishRate() {
+ // No-op
+ }
+
+ @Override
+ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+ // No-op
+ }
+
+ @Override
+ public boolean resetPublishCount() {
+ // No-op
+ return false;
+ }
+
+ @Override
+ public boolean isPublishRateExceeded() {
+ return false;
+ }
+
+ @Override
+ public void update(Policies policies, String clusterName) {
+ // No-op
+ }
+
+ @Override
+ public void update(PublishRate maxPublishRate) {
+ // No-op
+ }
+
+ @Override
+ public boolean tryAcquire(int numbers, long bytes) {
+ // No-op
+ return false;
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
new file mode 100644
index 0000000..7e481ab
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+
+import java.util.concurrent.atomic.LongAdder;
+
+public class PublishRateLimiterImpl implements PublishRateLimiter {
+ protected volatile int publishMaxMessageRate = 0;
+ protected volatile long publishMaxByteRate = 0;
+ protected volatile boolean publishThrottlingEnabled = false;
+ protected volatile boolean publishRateExceeded = false;
+ protected volatile LongAdder currentPublishMsgCount = new LongAdder();
+ protected volatile LongAdder currentPublishByteCount = new LongAdder();
+
+ public PublishRateLimiterImpl(Policies policies, String clusterName) {
+ update(policies, clusterName);
+ }
+
+ public PublishRateLimiterImpl(PublishRate maxPublishRate) {
+ update(maxPublishRate);
+ }
+
+ @Override
+ public void checkPublishRate() {
+ if (this.publishThrottlingEnabled && !publishRateExceeded) {
+ long currentPublishMsgRate = this.currentPublishMsgCount.sum();
+ long currentPublishByteRate = this.currentPublishByteCount.sum();
+ if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate >
this.publishMaxMessageRate)
+ || (this.publishMaxByteRate > 0 && currentPublishByteRate
> this.publishMaxByteRate)) {
+ publishRateExceeded = true;
+ }
+ }
+ }
+
+ @Override
+ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+ if (this.publishThrottlingEnabled) {
+ this.currentPublishMsgCount.add(numOfMessages);
+ this.currentPublishByteCount.add(msgSizeInBytes);
+ }
+ }
+
+ @Override
+ public boolean resetPublishCount() {
+ if (this.publishThrottlingEnabled) {
+ this.currentPublishMsgCount.reset();
+ this.currentPublishByteCount.reset();
+ this.publishRateExceeded = false;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isPublishRateExceeded() {
+ return publishRateExceeded;
+ }
+
+ @Override
+ public void update(Policies policies, String clusterName) {
+ final PublishRate maxPublishRate = policies.publishMaxMessageRate !=
null
+ ? policies.publishMaxMessageRate.get(clusterName)
+ : null;
+ update(maxPublishRate);
+ }
+
+ public void update(PublishRate maxPublishRate) {
+ if (maxPublishRate != null
+ && (maxPublishRate.publishThrottlingRateInMsg > 0 ||
maxPublishRate.publishThrottlingRateInByte > 0)) {
+ this.publishThrottlingEnabled = true;
+ this.publishMaxMessageRate =
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
+ this.publishMaxByteRate =
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
+ } else {
+ this.publishMaxMessageRate = 0;
+ this.publishMaxByteRate = 0;
+ this.publishThrottlingEnabled = false;
+ }
+ resetPublishCount();
+ }
+
+ @Override
+ public boolean tryAcquire(int numbers, long bytes) {
+ return false;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1bb7866..6f9860b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -82,7 +82,10 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedEx
import
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.PrecisPublishLimiter;
import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.PublishRateLimiter;
+import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
@@ -2135,24 +2138,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
/**
- * Get {@link TopicPolicies} for this topic.
- * @param topicName
- * @return TopicPolicies is exist else return null.
- */
- public TopicPolicies getTopicPolicies(TopicName topicName) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- }
- try {
- return
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.",
topicName.getPartitionedTopicName());
- return null;
- }
- }
-
- /**
* Get message TTL for this topic.
* @param topicPolicies TopicPolicies
* @param policies NameSpace policy
@@ -2366,6 +2351,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
}
+
+ if (policies != null && policies.getPublishRate() != null) {
+ topicPolicyPublishRate = policies.getPublishRate();
+ updateTopicPublishDispatcher();
+ }
}
private void
initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
@@ -2381,6 +2371,32 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return this;
}
+ @Override
+ protected void updateTopicPublishDispatcher() {
+ if (topicPolicyPublishRate != null &&
(topicPolicyPublishRate.publishThrottlingRateInByte > 0
+ || topicPolicyPublishRate.publishThrottlingRateInMsg > 0)) {
+ log.info("Enabling topic policy publish rate limiting {} on topic
{}", topicPolicyPublishRate, this.topic);
+ if (!preciseTopicPublishRateLimitingEnable) {
+ this.brokerService.setupBrokerPublishRateLimiterMonitor();
+ }
+
+ if (this.topicPublishRateLimiter == null
+ || this.topicPublishRateLimiter ==
PublishRateLimiter.DISABLED_RATE_LIMITER) {
+ // create new rateLimiter if rate-limiter is disabled
+ if (preciseTopicPublishRateLimitingEnable) {
+ this.topicPublishRateLimiter = new
PrecisPublishLimiter(topicPolicyPublishRate, ()-> this.enableCnxAutoRead());
+ } else {
+ this.topicPublishRateLimiter = new
PublishRateLimiterImpl(topicPolicyPublishRate);
+ }
+ } else {
+ this.topicPublishRateLimiter.update(topicPolicyPublishRate);
+ }
+ } else {
+ log.info("Disabling publish throttling for {}", this.topic);
+ this.topicPublishRateLimiter =
PublishRateLimiter.DISABLED_RATE_LIMITER;
+ enableProducerReadForPublishRateLimiting();
+ }
+ }
@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index fa073fa..459a8e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
@@ -170,4 +171,24 @@ public class TopicPoliciesDisableTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
+
+ @Test
+ public void testPublishRateDisabled() throws Exception {
+ PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
+ log.info("Publish Rate: {} will set to the topic: {}", publishRate,
testTopic);
+
+ try {
+ admin.topics().setPublishRate(testTopic, publishRate);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getPublishRate(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index e216661..4eb656b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
@@ -415,4 +416,42 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(testTopic, true);
}
+
+ @Test
+ public void testGetSetPublishRate() throws Exception {
+ PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
+ log.info("Publish Rate: {} will set to the topic: {}", publishRate,
testTopic);
+
+ admin.topics().setPublishRate(testTopic, publishRate);
+ log.info("Publish Rate set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ PublishRate getPublishRate = admin.topics().getPublishRate(testTopic);
+ log.info("Publish Rate: {} get on topic: {}", getPublishRate,
testTopic);
+ Assert.assertEquals(getPublishRate, publishRate);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
+
+ @Test
+ public void testRemovePublishRate() throws Exception {
+ PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
+ log.info("Publish Rate: {} will set to the topic: {}", publishRate,
testTopic);
+
+ admin.topics().setPublishRate(testTopic, publishRate);
+ log.info("Publish Rate set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ PublishRate getPublishRate = admin.topics().getPublishRate(testTopic);
+ log.info("Publish Rate: {} get on topic: {}", getPublishRate,
testTopic);
+ Assert.assertEquals(getPublishRate, publishRate);
+
+ admin.topics().removePublishRate(testTopic);
+ Thread.sleep(3000);
+ getPublishRate = admin.topics().getPublishRate(testTopic);
+ log.info("Publish Rate get on topic: {} after remove", getPublishRate,
testTopic);
+ Assert.assertNull(getPublishRate);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 6ca2390..74d134f 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -39,9 +39,9 @@ import
org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
-
/**
* Admin interface for Topics management.
*/
@@ -2007,4 +2007,64 @@ public interface Topics {
*/
CompletableFuture<Void> removeCompactionThresholdAsync(String topic);
+ /**
+ * Set message-publish-rate (topics can publish this many messages per
second).
+ *
+ * @param topic
+ * @param publishMsgRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setPublishRate(String topic, PublishRate publishMsgRate) throws
PulsarAdminException;
+
+ /**
+ * Set message-publish-rate (topics can publish this many messages per
second) asynchronously.
+ *
+ * @param topic
+ * @param publishMsgRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate
publishMsgRate);
+
+ /**
+ * Get message-publish-rate (topics can publish this many messages per
second).
+ *
+ * @param topic
+ * @return number of messages per second
+ * @throws PulsarAdminException Unexpected error
+ */
+ PublishRate getPublishRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Get message-publish-rate (topics can publish this many messages per
second) asynchronously.
+ *
+ * @param topic
+ * @return number of messages per second
+ */
+ CompletableFuture<PublishRate> getPublishRateAsync(String topic);
+
+ /**
+ * Remove message-publish-rate.
+ * <p/>
+ * Remove topic message publish rate
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * unexpected error
+ */
+ void removePublishRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove message-publish-rate asynchronously.
+ * <p/>
+ * Remove topic message publish rate
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * unexpected error
+ */
+ CompletableFuture<Void> removePublishRateAsync(String topic) throws
PulsarAdminException;
+
+
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ce4ce78..2711b2f 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -75,6 +75,7 @@ import
org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
@@ -2079,5 +2080,81 @@ public class TopicsImpl extends BaseResource implements
Topics {
return asyncDeleteRequest(path);
}
- private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
+ @Override
+ public PublishRate getPublishRate(String topic) throws
PulsarAdminException {
+ try {
+ return getPublishRateAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "publishRate");
+ final CompletableFuture<PublishRate> future = new
CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<PublishRate>() {
+ @Override
+ public void completed(PublishRate publishRate) {
+ future.complete(publishRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setPublishRate(String topic, PublishRate publishRate) throws
PulsarAdminException {
+ try {
+ setPublishRateAsync(topic, publishRate).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setPublishRateAsync(String topic,
PublishRate publishRate) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "publishRate");
+ return asyncPostRequest(path, Entity.entity(publishRate,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removePublishRate(String topic) throws PulsarAdminException {
+ try {
+ removePublishRateAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removePublishRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "publishRate");
+ return asyncDeleteRequest(path);
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 5b19576..df0d342 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -52,6 +52,7 @@ import
org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -126,6 +127,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-compaction-threshold", new
GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new
SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new
RemoveCompactionThreshold());
+ jcommander.addCommand("get-publish-rate", new GetPublishRate());
+ jcommander.addCommand("set-publish-rate", new SetPublishRate());
+ jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
}
@Parameters(commandDescription = "Get the list of topics under a
namespace.")
@@ -1207,4 +1211,49 @@ public class CmdTopics extends CmdBase {
admin.topics().removeCompactionThreshold(persistentTopic);
}
}
+
+ @Parameters(commandDescription = "Get publish rate for a topic")
+ private class GetPublishRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getPublishRate(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set publish rate for a topic")
+ private class SetPublishRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-publish-rate",
+ "-m" }, description = "message-publish-rate (default -1 will be
overwrite if not passed)\n", required = false)
+ private int msgPublishRate = -1;
+
+ @Parameter(names = { "--byte-publish-rate",
+ "-b" }, description = "byte-publish-rate (default -1 will be
overwrite if not passed)\n", required = false)
+ private long bytePublishRate = -1;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setPublishRate(persistentTopic,
+ new PublishRate(msgPublishRate, bytePublishRate));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove publish rate for a topic")
+ private class RemovePublishRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removePublishRate(persistentTopic);
+ }
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index d56e283..33fbe80 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -51,6 +51,7 @@ public class TopicPolicies {
private Boolean delayedDeliveryEnabled = null;
private DispatchRate dispatchRate = null;
private Long compactionThreshold = null;
+ private PublishRate publishRate = null;
public boolean isMaxUnackedMessagesOnConsumerSet() {
return maxUnackedMessagesOnConsumer != null;
@@ -107,4 +108,8 @@ public class TopicPolicies {
public boolean isCompactionThresholdSet() {
return compactionThreshold != null;
}
+
+ public boolean isPublishRateSet() {
+ return publishRate != null;
+ }
}