This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c11202c827f Optimize topic policy with HierarchyTopicPolicies about
replicatorDispatchRate (#14161)
c11202c827f is described below
commit c11202c827ff5ab22ed9cb58e133ef1ee9830803
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Mon Feb 28 09:46:06 2022 +0800
Optimize topic policy with HierarchyTopicPolicies about
replicatorDispatchRate (#14161)
(cherry picked from commit 2b3e8aeb5a1c259e0325e5a91dc5d7e20c6ee569)
---
.../pulsar/broker/service/AbstractTopic.java | 30 ++++++++++++++++++++--
.../pulsar/broker/service/BrokerService.java | 12 +++++----
.../apache/pulsar/broker/service/Replicator.java | 3 +--
.../service/persistent/DispatchRateLimiter.java | 3 +++
.../service/persistent/PersistentReplicator.java | 9 +++----
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/broker/service/ServerCnxTest.java | 15 +++++++++++
.../policies/data/HierarchyTopicPolicies.java | 2 ++
8 files changed, 61 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4ed89908651..286eccb8bd6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -161,6 +161,10 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}
+ public DispatchRateImpl getReplicatorDispatchRate() {
+ return this.topicPolicies.getReplicatorDispatchRate().get();
+ }
+
private SchemaCompatibilityStrategy
formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null :
strategy;
}
@@ -194,6 +198,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
+
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
@@ -233,6 +238,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
+ updateNamespaceReplicatorDispatchRate(namespacePolicies,
+ brokerService.getPulsar().getConfig().getClusterName());
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map,
type)));
@@ -246,6 +253,11 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
}
+ private void updateNamespaceReplicatorDispatchRate(Policies
namespacePolicies, String cluster) {
+ topicPolicies.getReplicatorDispatchRate()
+
.updateNamespaceValue(normalize(namespacePolicies.replicatorDispatchRate.get(cluster)));
+ }
+
private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
if (dispatchRate != null
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
@@ -315,6 +327,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
SchemaCompatibilityStrategy schemaCompatibilityStrategy =
config.getSchemaCompatibilityStrategy();
+
topicPolicies.getReplicatorDispatchRate().updateBrokerValue(replicatorDispatchRateInBroker(config));
if (isSystemTopic()) {
schemaCompatibilityStrategy =
config.getSystemTopicSchemaCompatibilityStrategy();
}
@@ -331,6 +344,14 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
.build();
}
+ private DispatchRateImpl
replicatorDispatchRateInBroker(ServiceConfiguration config) {
+ return DispatchRateImpl.builder()
+
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
+
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
+ .ratePeriodInSecond(1)
+ .build();
+ }
+
private EnumSet<SubType> subTypeStringsToEnumSet(Set<String>
getSubscriptionTypesEnabled) {
EnumSet<SubType> subTypes = EnumSet.noneOf(SubType.class);
for (String subTypeStr :
CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) {
@@ -1123,8 +1144,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
public void updateBrokerSubscriptionDispatchRate() {
- topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
-
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+ topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
+
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
public void addFilteredEntriesCount(int filtered) {
@@ -1134,4 +1155,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
public long getFilteredEntriesCount() {
return this.filteredEntriesCounter.longValue();
}
+
+ public void updateBrokerReplicatorDispatchRate() {
+ topicPolicies.getReplicatorDispatchRate().updateBrokerValue(
+
replicatorDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e05bd7bd9c2..cda3d90fd98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2313,12 +2313,14 @@ public class BrokerService implements Closeable {
private void updateReplicatorMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic Replicator in Geo-replication
- forEachTopic(topic ->
- topic.getReplicators().forEach((name, persistentReplicator) ->
{
- if (persistentReplicator.getRateLimiter().isPresent()) {
-
persistentReplicator.getRateLimiter().get().updateDispatchRate();
+ forEachTopic(topic -> {
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic)
topic).updateBrokerReplicatorDispatchRate();
}
- }));
+ topic.getReplicators().forEach((name,
persistentReplicator) ->
+
persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+ }
+ );
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 5f738ac1937..2cd6ec62327 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
-import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
public interface Replicator {
@@ -38,7 +37,7 @@ public interface Replicator {
String getRemoteCluster();
- default void initializeDispatchRateLimiterIfNeeded(Optional<Policies>
policies) {
+ default void initializeDispatchRateLimiterIfNeeded() {
//No-op
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 97a10631067..3247494577a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -173,6 +173,9 @@ public class DispatchRateLimiter {
case SUBSCRIPTION:
updateDispatchRate(topic.getSubscriptionDispatchRate());
return;
+ case REPLICATOR:
+ updateDispatchRate(topic.getReplicatorDispatchRate());
+ return;
}
Optional<DispatchRate> dispatchRate =
getTopicPolicyDispatchRate(brokerService, topicName, type);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 1f208bc768a..cc5410dbbeb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -57,7 +57,6 @@ import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.MarkerType;
-import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
@@ -123,7 +122,7 @@ public class PersistentReplicator extends AbstractReplicator
readMaxSizeBytes =
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
producerQueueThreshold = (int) (producerQueueSize * 0.9);
- this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+ this.initializeDispatchRateLimiterIfNeeded();
startProducer();
}
@@ -705,9 +704,9 @@ public class PersistentReplicator extends AbstractReplicator
}
@Override
- public void initializeDispatchRateLimiterIfNeeded(Optional<Policies>
policies) {
- if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
- .isDispatchRateNeeded(topic.getBrokerService(), policies,
topic.getName(), Type.REPLICATOR)) {
+ public void initializeDispatchRateLimiterIfNeeded() {
+ if (!dispatchRateLimiter.isPresent()
+ &&
DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(topic, Type.REPLICATOR));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 629158c4d50..27bb77fbc5d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -395,7 +395,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// dispatch rate limiter for each replicator
replicators.forEach((name, replicator) ->
- replicator.initializeDispatchRateLimiterIfNeeded(policies));
+ replicator.initializeDispatchRateLimiterIfNeeded());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index afa5d1aad03..a7fbd070f1d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1397,6 +1397,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ //
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1429,6 +1432,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ //
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1465,6 +1471,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ //
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1499,6 +1508,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ //
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1539,6 +1551,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ //
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 8546c83f0e4..917b9753cc3 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -51,6 +51,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<PublishRate> publishRate;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
+ final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
final PolicyHierarchyValue<SchemaCompatibilityStrategy>
schemaCompatibilityStrategy;
@@ -77,6 +78,7 @@ public class HierarchyTopicPolicies {
publishRate = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+ replicatorDispatchRate = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();