This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e75f0a839d6a79aa27187c527b83df568e892853
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Mar 10 20:15:12 2022 +0800

    Fix system topic replicate issue (#14605)
    
    ### Motivation
    PIP 92 has introduced topic policies across clusters. But after 
https://github.com/apache/pulsar/pull/12517, if the policy is not global, it 
set the replicate cluster to an empty set.
    ```
    PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
     if (policies == null || !policies.isGlobalPolicies()) {
         // we don't need to replicate local policies to remote cluster, so set 
`replicateTo` to empty.
         builder.replicateTo(new HashSet<>());
    }
    ```
    It should set the `replicateTo` with the local cluster, not an empty set.
    
    Otherwise,  it will cause the system event to be replicated. Details are 
here :
    
https://github.com/apache/pulsar/blob/d4c2e613d305f8f785b5ef357b7cbe2ccc271043/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L319-L328
    
    (cherry picked from commit e470de54314483ccb4f4970e0d772c81c4bdb731)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  14 +-
 .../service/ReplicatorTopicPoliciesTest.java       | 174 ++++++++++++++++++---
 2 files changed, 165 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index bbb0257..e7af027 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +62,8 @@ import org.slf4j.LoggerFactory;
 public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesService {
 
     private final PulsarService pulsarService;
+    private final HashSet localCluster;
+    private final String clusterName;
     private volatile NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
 
     @VisibleForTesting
@@ -80,6 +83,8 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
+        this.clusterName = pulsarService.getConfiguration().getClusterName();
+        this.localCluster = Sets.newHashSet(clusterName);
     }
 
     @Override
@@ -143,7 +148,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
         if (policies == null || !policies.isGlobalPolicies()) {
             // we don't need to replicate local policies to remote cluster, so 
set `replicateTo` to empty.
-            builder.replicateTo(new HashSet<>());
+            builder.replicateTo(localCluster);
         }
         return builder
                 .actionType(actionType)
@@ -454,9 +459,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    private static boolean hasReplicateTo(Message<?> message) {
+    private boolean hasReplicateTo(Message<?> message) {
         if (message instanceof MessageImpl) {
-            return ((MessageImpl<?>) message).hasReplicateTo();
+            return ((MessageImpl<?>) message).hasReplicateTo()
+                    ? (((MessageImpl<?>) message).getReplicateTo().size() == 1
+                        ? !((MessageImpl<?>) 
message).getReplicateTo().contains(clusterName) : true)
+                    : false;
         }
         if (message instanceof TopicMessageImpl) {
             return hasReplicateTo(((TopicMessageImpl<?>) 
message).getMessage());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 29f0b8e..8ec3e04 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -85,6 +85,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl();
         backlogQuota.setLimitSize(1);
         backlogQuota.setLimitTime(2);
+        // local
+        admin1.topicPolicies().setBacklogQuota(topic, backlogQuota);
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin2.topicPolicies().getBacklogQuotaMap(topic).size(), 0));
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(admin3.topicPolicies().getBacklogQuotaMap(topic).size(), 0));
+        // global
         admin1.topicPolicies(true).setBacklogQuota(topic, backlogQuota);
 
         Awaitility.await().untilAsserted(() ->
@@ -104,7 +111,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set message ttl
+        // local
+        admin1.topicPolicies().setMessageTTL(topic, 10);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMessageTTL(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMessageTTL(topic)));
+        // global
         admin1.topicPolicies(true).setMessageTTL(topic, 10);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getMessageTTL(topic).intValue(), 10));
@@ -125,6 +138,10 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         init(namespace, topic);
         // set global topic policy
         SubscribeRate subscribeRate = new SubscribeRate(100, 10000);
+        // local
+        admin1.topicPolicies().setSubscribeRate(topic, subscribeRate);
+        untilRemoteClustersAsserted(admin -> 
assertNull(admin.topicPolicies().getSubscribeRate(topic)));
+        // global
         admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate);
 
         // get global topic policy
@@ -141,7 +158,10 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set global topic policy
+        // local
+        admin1.topicPolicies().setMaxMessageSize(topic, 1000);
+        untilRemoteClustersAsserted(admin -> 
assertNull(admin.topicPolicies().getMaxMessageSize(topic)));
+        // global
         admin1.topicPolicies(true).setMaxMessageSize(topic, 1000);
 
         // get global topic policy
@@ -160,6 +180,10 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         init(namespace, topic);
         // set global topic policy
         PublishRate publishRate = new PublishRate(100, 10000);
+        // local
+        admin1.topicPolicies().setPublishRate(topic, publishRate);
+        untilRemoteClustersAsserted(admin -> 
assertNull(admin.topicPolicies().getPublishRate(topic)));
+        // global
         admin1.topicPolicies(true).setPublishRate(topic, publishRate);
 
         // get global topic policy
@@ -176,7 +200,11 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set global topic policy
+        // local
+        admin1.topicPolicies().setDeduplicationSnapshotInterval(topic, 100);
+        untilRemoteClustersAsserted(
+                admin -> 
assertNull(admin.topicPolicies().getDeduplicationSnapshotInterval(topic)));
+        // global
         admin1.topicPolicies(true).setDeduplicationSnapshotInterval(topic, 
100);
 
         // get global topic policy
@@ -207,6 +235,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         init(namespace, topic);
         // set PersistencePolicies
         PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000);
+        // local
+        admin1.topicPolicies().setPersistence(topic, policies);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getPersistence(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getPersistence(topic)));
+        // global
         admin1.topicPolicies(true).setPersistence(topic, policies);
 
         Awaitility.await().untilAsserted(() ->
@@ -226,7 +261,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set subscription types policies
+        // local
+        admin1.topicPolicies().setDeduplicationStatus(topic, true);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getDeduplicationStatus(topic)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getDeduplicationStatus(topic)));
+        // global
         admin1.topicPolicies(true).setDeduplicationStatus(topic, true);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
@@ -238,7 +279,6 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
                 
assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
         Awaitility.await().untilAsserted(() ->
                 
assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
-
     }
 
     @Test
@@ -246,8 +286,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-
-        // set max producer policies
+        // local
+        admin1.topicPolicies().setMaxProducers(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxProducers(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxProducers(topic)));
+        // global
         admin1.topicPolicies(true).setMaxProducers(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getMaxProducers(topic).intValue(), 
100));
@@ -268,7 +313,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
 
         init(namespace, topic);
-        // set max consumer per sub
+        // local
+        admin1.topicPolicies().setMaxConsumersPerSubscription(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getMaxConsumersPerSubscription(topic)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getMaxConsumersPerSubscription(topic)));
+        // global
         admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(),
 100));
@@ -277,7 +328,6 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
 
         Awaitility.await().untilAsserted(() -> {
             
assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(),
 100);
-            
assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic));
         });
 
         //remove max consumer per sub
@@ -293,7 +343,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set max unacked msgs per consumers
+        // local
+        admin1.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getMaxUnackedMessagesOnConsumer(topic)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getMaxUnackedMessagesOnConsumer(topic)));
+        // global
         admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(),
 100));
@@ -315,6 +371,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         init(namespace, persistentTopicName);
         // set retention
         RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
+        // local
+        admin1.topicPolicies().setRetention(persistentTopicName, 
retentionPolicies);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getRetention(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getRetention(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setRetention(persistentTopicName, 
retentionPolicies);
 
         Awaitility.await().untilAsserted(() ->
@@ -324,7 +387,6 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
 
         Awaitility.await().untilAsserted(() -> {
             
assertEquals(admin1.topicPolicies(true).getRetention(persistentTopicName), 
retentionPolicies);
-            
assertNull(admin1.topicPolicies().getRetention(persistentTopicName));
         });
 
         //remove retention
@@ -341,7 +403,14 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         init(namespace, topic);
         Set<SubscriptionType> subscriptionTypes = new HashSet<>();
         subscriptionTypes.add(SubscriptionType.Shared);
-        // set subscription types policies
+        // local
+        admin1.topicPolicies().setSubscriptionTypesEnabled(topic, 
subscriptionTypes);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getSubscriptionTypesEnabled(topic), null));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getSubscriptionTypesEnabled(topic), null));
+
+        // global
         admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, 
subscriptionTypes);
         Awaitility.await().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), 
subscriptionTypes));
@@ -353,7 +422,6 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
                 
assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), 
Collections.emptySet()));
         Awaitility.await().untilAsserted(() ->
                 
assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), 
Collections.emptySet()));
-
     }
 
 
@@ -362,7 +430,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set max consumers
+        // local
+        admin1.topicPolicies().setMaxConsumers(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxConsumers(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxConsumers(topic)));
+        // global
         admin1.topicPolicies(true).setMaxConsumers(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getMaxConsumers(topic).intValue(), 
100));
@@ -389,6 +463,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
                 .ratePeriodInSecond(3)
                 .relativeToPublishRate(true)
                 .build();
+        // local
+        admin1.topicPolicies().setDispatchRate(persistentTopicName, 
dispatchRate);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getDispatchRate(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setDispatchRate(persistentTopicName, 
dispatchRate);
 
         // get dispatchRate
@@ -411,7 +492,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
         DelayedDeliveryPolicies policies = 
DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build();
-        // set delayed delivery
+        // local
+        admin1.topicPolicies().setDelayedDeliveryPolicy(topic, policies);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getDelayedDeliveryPolicy(topic)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getDelayedDeliveryPolicy(topic)));
+        // global
         admin1.topicPolicies(true).setDelayedDeliveryPolicy(topic, policies);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic), 
policies));
@@ -434,6 +521,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         // set InactiveTopicPolicies
         InactiveTopicPolicies inactiveTopicPolicies =
                 new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true);
+        // local
+        admin1.topicPolicies().setInactiveTopicPolicies(persistentTopicName, 
inactiveTopicPolicies);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getInactiveTopicPolicies(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getInactiveTopicPolicies(persistentTopicName)));
+        // global
         
admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, 
inactiveTopicPolicies);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true)
@@ -462,6 +556,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
                 .dispatchThrottlingRateInByte(1)
                 .relativeToPublishRate(true)
                 .build();
+        // local
+        
admin1.topicPolicies().setSubscriptionDispatchRate(persistentTopicName, 
dispatchRate);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getSubscriptionDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getSubscriptionDispatchRate(persistentTopicName)));
+        // global
         
admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, 
dispatchRate);
         // get subscription dispatch rate
         Awaitility.await().untilAsserted(() ->
@@ -492,6 +593,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
                 .dispatchThrottlingRateInByte(1)
                 .relativeToPublishRate(true)
                 .build();
+        // local
+        admin1.topicPolicies().setReplicatorDispatchRate(persistentTopicName, 
dispatchRate);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getReplicatorDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getReplicatorDispatchRate(persistentTopicName)));
+        // global
         
admin1.topicPolicies(true).setReplicatorDispatchRate(persistentTopicName, 
dispatchRate);
         // get replicator dispatch rate
         Awaitility.await().untilAsserted(() ->
@@ -514,7 +622,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + 
UUID.randomUUID();
         init(namespace, topic);
-        // set max unacked msgs per sub
+        // local
+        admin1.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getMaxUnackedMessagesOnSubscription(topic)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getMaxUnackedMessagesOnSubscription(topic)));
+        // global
         admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 
100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 
assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(),
 100));
@@ -534,7 +648,13 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String persistentTopicName = "persistent://" + namespace + 
"/topic" + UUID.randomUUID();
 
         init(namespace, persistentTopicName);
-        // set compaction threshold
+        // local
+        admin1.topicPolicies().setCompactionThreshold(persistentTopicName, 1);
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getCompactionThreshold(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getCompactionThreshold(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setCompactionThreshold(persistentTopicName, 
1);
         // get compaction threshold
         Awaitility.await().untilAsserted(() ->
@@ -557,8 +677,12 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + 
"/topic" + UUID.randomUUID();
         init(namespace, persistentTopicName);
+        // local
+        
admin1.topicPolicies().setMaxSubscriptionsPerTopic(persistentTopicName, 1024);
+        untilRemoteClustersAsserted(
+                admin -> 
assertNull(admin.topicPolicies().getMaxSubscriptionsPerTopic(persistentTopicName)));
 
-        //set max subscriptions per topic
+        // global
         
admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 
1024);
 
         //get max subscriptions per topic
@@ -581,8 +705,18 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         OffloadPoliciesImpl offloadPolicies =
                 OffloadPoliciesImpl.create("s3", "region", "bucket", 
"endpoint", null, null, null, null,
                 8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST);
-
-        // set offload policies
+        // local
+        try {
+            admin1.topicPolicies().setOffloadPolicies(persistentTopicName, 
offloadPolicies);
+        } catch (Exception exception){
+            // driver not found exception.
+            assertTrue(exception instanceof 
PulsarAdminException.ServerSideErrorException);
+        }
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin2.topicPolicies().getOffloadPolicies(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(admin3.topicPolicies().getOffloadPolicies(persistentTopicName)));
+        // global
         try{
             admin1.topicPolicies(true).setOffloadPolicies(persistentTopicName, 
offloadPolicies);
         }catch (Exception exception){

Reply via email to