This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e75f0a839d6a79aa27187c527b83df568e892853 Author: Jiwei Guo <[email protected]> AuthorDate: Thu Mar 10 20:15:12 2022 +0800 Fix system topic replicate issue (#14605) ### Motivation PIP 92 has introduced topic policies across clusters. But after https://github.com/apache/pulsar/pull/12517, if the policy is not global, it set the replicate cluster to an empty set. ``` PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); if (policies == null || !policies.isGlobalPolicies()) { // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty. builder.replicateTo(new HashSet<>()); } ``` It should set the `replicateTo` with the local cluster, not an empty set. Otherwise, it will cause the system event to be replicated. Details are here : https://github.com/apache/pulsar/blob/d4c2e613d305f8f785b5ef357b7cbe2ccc271043/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L319-L328 (cherry picked from commit e470de54314483ccb4f4970e0d772c81c4bdb731) --- .../SystemTopicBasedTopicPoliciesService.java | 14 +- .../service/ReplicatorTopicPoliciesTest.java | 174 ++++++++++++++++++--- 2 files changed, 165 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index bbb0257..e7af027 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -61,6 +62,8 @@ import org.slf4j.LoggerFactory; public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService { private final PulsarService pulsarService; + private final HashSet localCluster; + private final String clusterName; private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; @VisibleForTesting @@ -80,6 +83,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.pulsarService = pulsarService; + this.clusterName = pulsarService.getConfiguration().getClusterName(); + this.localCluster = Sets.newHashSet(clusterName); } @Override @@ -143,7 +148,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); if (policies == null || !policies.isGlobalPolicies()) { // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty. - builder.replicateTo(new HashSet<>()); + builder.replicateTo(localCluster); } return builder .actionType(actionType) @@ -454,9 +459,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } - private static boolean hasReplicateTo(Message<?> message) { + private boolean hasReplicateTo(Message<?> message) { if (message instanceof MessageImpl) { - return ((MessageImpl<?>) message).hasReplicateTo(); + return ((MessageImpl<?>) message).hasReplicateTo() + ? (((MessageImpl<?>) message).getReplicateTo().size() == 1 + ? !((MessageImpl<?>) message).getReplicateTo().contains(clusterName) : true) + : false; } if (message instanceof TopicMessageImpl) { return hasReplicateTo(((TopicMessageImpl<?>) message).getMessage()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index 29f0b8e..8ec3e04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -85,6 +85,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl(); backlogQuota.setLimitSize(1); backlogQuota.setLimitTime(2); + // local + admin1.topicPolicies().setBacklogQuota(topic, backlogQuota); + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.topicPolicies().getBacklogQuotaMap(topic).size(), 0)); + Awaitility.await().untilAsserted(() -> + assertEquals(admin3.topicPolicies().getBacklogQuotaMap(topic).size(), 0)); + // global admin1.topicPolicies(true).setBacklogQuota(topic, backlogQuota); Awaitility.await().untilAsserted(() -> @@ -104,7 +111,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set message ttl + // local + admin1.topicPolicies().setMessageTTL(topic, 10); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getMessageTTL(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getMessageTTL(topic))); + // global admin1.topicPolicies(true).setMessageTTL(topic, 10); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getMessageTTL(topic).intValue(), 10)); @@ -125,6 +138,10 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { init(namespace, topic); // set global topic policy SubscribeRate subscribeRate = new SubscribeRate(100, 10000); + // local + admin1.topicPolicies().setSubscribeRate(topic, subscribeRate); + untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies().getSubscribeRate(topic))); + // global admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate); // get global topic policy @@ -141,7 +158,10 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set global topic policy + // local + admin1.topicPolicies().setMaxMessageSize(topic, 1000); + untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies().getMaxMessageSize(topic))); + // global admin1.topicPolicies(true).setMaxMessageSize(topic, 1000); // get global topic policy @@ -160,6 +180,10 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { init(namespace, topic); // set global topic policy PublishRate publishRate = new PublishRate(100, 10000); + // local + admin1.topicPolicies().setPublishRate(topic, publishRate); + untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies().getPublishRate(topic))); + // global admin1.topicPolicies(true).setPublishRate(topic, publishRate); // get global topic policy @@ -176,7 +200,11 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set global topic policy + // local + admin1.topicPolicies().setDeduplicationSnapshotInterval(topic, 100); + untilRemoteClustersAsserted( + admin -> assertNull(admin.topicPolicies().getDeduplicationSnapshotInterval(topic))); + // global admin1.topicPolicies(true).setDeduplicationSnapshotInterval(topic, 100); // get global topic policy @@ -207,6 +235,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { init(namespace, topic); // set PersistencePolicies PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000); + // local + admin1.topicPolicies().setPersistence(topic, policies); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getPersistence(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getPersistence(topic))); + // global admin1.topicPolicies(true).setPersistence(topic, policies); Awaitility.await().untilAsserted(() -> @@ -226,7 +261,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set subscription types policies + // local + admin1.topicPolicies().setDeduplicationStatus(topic, true); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getDeduplicationStatus(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getDeduplicationStatus(topic))); + // global admin1.topicPolicies(true).setDeduplicationStatus(topic, true); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic))); @@ -238,7 +279,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic))); Awaitility.await().untilAsserted(() -> assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic))); - } @Test @@ -246,8 +286,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - - // set max producer policies + // local + admin1.topicPolicies().setMaxProducers(topic, 100); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getMaxProducers(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getMaxProducers(topic))); + // global admin1.topicPolicies(true).setMaxProducers(topic, 100); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getMaxProducers(topic).intValue(), 100)); @@ -268,7 +313,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set max consumer per sub + // local + admin1.topicPolicies().setMaxConsumersPerSubscription(topic, 100); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getMaxConsumersPerSubscription(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getMaxConsumersPerSubscription(topic))); + // global admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100)); @@ -277,7 +328,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { Awaitility.await().untilAsserted(() -> { assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100); - assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic)); }); //remove max consumer per sub @@ -293,7 +343,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set max unacked msgs per consumers + // local + admin1.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 100); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getMaxUnackedMessagesOnConsumer(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getMaxUnackedMessagesOnConsumer(topic))); + // global admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100)); @@ -315,6 +371,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { init(namespace, persistentTopicName); // set retention RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1); + // local + admin1.topicPolicies().setRetention(persistentTopicName, retentionPolicies); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getRetention(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getRetention(persistentTopicName))); + // global admin1.topicPolicies(true).setRetention(persistentTopicName, retentionPolicies); Awaitility.await().untilAsserted(() -> @@ -324,7 +387,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { Awaitility.await().untilAsserted(() -> { assertEquals(admin1.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies); - assertNull(admin1.topicPolicies().getRetention(persistentTopicName)); }); //remove retention @@ -341,7 +403,14 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { init(namespace, topic); Set<SubscriptionType> subscriptionTypes = new HashSet<>(); subscriptionTypes.add(SubscriptionType.Shared); - // set subscription types policies + // local + admin1.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypes); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getSubscriptionTypesEnabled(topic), null)); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getSubscriptionTypesEnabled(topic), null)); + + // global admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, subscriptionTypes); Awaitility.await().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes)); @@ -353,7 +422,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet())); Awaitility.await().untilAsserted(() -> assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet())); - } @@ -362,7 +430,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set max consumers + // local + admin1.topicPolicies().setMaxConsumers(topic, 100); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getMaxConsumers(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getMaxConsumers(topic))); + // global admin1.topicPolicies(true).setMaxConsumers(topic, 100); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getMaxConsumers(topic).intValue(), 100)); @@ -389,6 +463,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { .ratePeriodInSecond(3) .relativeToPublishRate(true) .build(); + // local + admin1.topicPolicies().setDispatchRate(persistentTopicName, dispatchRate); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getDispatchRate(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getDispatchRate(persistentTopicName))); + // global admin1.topicPolicies(true).setDispatchRate(persistentTopicName, dispatchRate); // get dispatchRate @@ -411,7 +492,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); DelayedDeliveryPolicies policies = DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build(); - // set delayed delivery + // local + admin1.topicPolicies().setDelayedDeliveryPolicy(topic, policies); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getDelayedDeliveryPolicy(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getDelayedDeliveryPolicy(topic))); + // global admin1.topicPolicies(true).setDelayedDeliveryPolicy(topic, policies); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies)); @@ -434,6 +521,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { // set InactiveTopicPolicies InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true); + // local + admin1.topicPolicies().setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getInactiveTopicPolicies(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getInactiveTopicPolicies(persistentTopicName))); + // global admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies); Awaitility.await().untilAsserted(() -> assertEquals(admin2.topicPolicies(true) @@ -462,6 +556,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { .dispatchThrottlingRateInByte(1) .relativeToPublishRate(true) .build(); + // local + admin1.topicPolicies().setSubscriptionDispatchRate(persistentTopicName, dispatchRate); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getSubscriptionDispatchRate(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getSubscriptionDispatchRate(persistentTopicName))); + // global admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, dispatchRate); // get subscription dispatch rate Awaitility.await().untilAsserted(() -> @@ -492,6 +593,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { .dispatchThrottlingRateInByte(1) .relativeToPublishRate(true) .build(); + // local + admin1.topicPolicies().setReplicatorDispatchRate(persistentTopicName, dispatchRate); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getReplicatorDispatchRate(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getReplicatorDispatchRate(persistentTopicName))); + // global admin1.topicPolicies(true).setReplicatorDispatchRate(persistentTopicName, dispatchRate); // get replicator dispatch rate Awaitility.await().untilAsserted(() -> @@ -514,7 +622,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, topic); - // set max unacked msgs per sub + // local + admin1.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 100); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getMaxUnackedMessagesOnSubscription(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getMaxUnackedMessagesOnSubscription(topic))); + // global admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100); Awaitility.await().ignoreExceptions().untilAsserted(() -> assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100)); @@ -534,7 +648,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, persistentTopicName); - // set compaction threshold + // local + admin1.topicPolicies().setCompactionThreshold(persistentTopicName, 1); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getCompactionThreshold(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getCompactionThreshold(persistentTopicName))); + // global admin1.topicPolicies(true).setCompactionThreshold(persistentTopicName, 1); // get compaction threshold Awaitility.await().untilAsserted(() -> @@ -557,8 +677,12 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); init(namespace, persistentTopicName); + // local + admin1.topicPolicies().setMaxSubscriptionsPerTopic(persistentTopicName, 1024); + untilRemoteClustersAsserted( + admin -> assertNull(admin.topicPolicies().getMaxSubscriptionsPerTopic(persistentTopicName))); - //set max subscriptions per topic + // global admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 1024); //get max subscriptions per topic @@ -581,8 +705,18 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null, 8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST); - - // set offload policies + // local + try { + admin1.topicPolicies().setOffloadPolicies(persistentTopicName, offloadPolicies); + } catch (Exception exception){ + // driver not found exception. + assertTrue(exception instanceof PulsarAdminException.ServerSideErrorException); + } + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies().getOffloadPolicies(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies().getOffloadPolicies(persistentTopicName))); + // global try{ admin1.topicPolicies(true).setOffloadPolicies(persistentTopicName, offloadPolicies); }catch (Exception exception){
