This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8439082f79c [fix][broker] Fix broken topic policy implementation 
compatibility with old pulsar version (#22535)
8439082f79c is described below

commit 8439082f79c5480b58be93fb360ed07b68016631
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Apr 19 10:30:55 2024 -0700

    [fix][broker] Fix broken topic policy implementation compatibility with old 
pulsar version (#22535)
    
    (cherry picked from commit 59daac64c210f539e733f883edad09d08333aa62)
---
 .../pulsar/broker/service/AbstractTopic.java       | 52 +++++++++++++---------
 ...kerInternalClientConfigurationOverrideTest.java | 42 ++++++++++++++++-
 2 files changed, 72 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 5bbc30f7ed0..16aeabbcc75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -216,13 +216,16 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                     
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
         }
         
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
-        
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
-        
topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
+        topicPolicies.getMaxSubscriptionsPerTopic()
+                
.updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()));
+        topicPolicies.getMaxUnackedMessagesOnConsumer()
+                
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()));
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                .updateTopicValue(data.getMaxUnackedMessagesOnSubscription());
-        
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
-        
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
-        
topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
+                
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()));
+        
topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()));
+        
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()));
+        topicPolicies.getMaxConsumersPerSubscription()
+                
.updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()));
         
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
         
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
         
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue(
@@ -233,8 +236,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
                 
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
                         data.getBackLogQuotaMap() == null ? null : 
data.getBackLogQuotaMap().get(type.toString())));
-        
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
-        
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+        
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+        
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
         
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
         
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -261,15 +264,19 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         topicPolicies.getReplicationClusters().updateNamespaceValue(
                 new 
ArrayList<>(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
         topicPolicies.getMaxUnackedMessagesOnConsumer()
-                
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
-        
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
-        
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
-        
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
-        
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+        topicPolicies.getMessageTTLInSeconds()
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+        topicPolicies.getMaxSubscriptionsPerTopic()
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+        topicPolicies.getMaxProducersPerTopic()
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+        topicPolicies.getMaxConsumerPerTopic()
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
         topicPolicies.getMaxConsumersPerSubscription()
-                
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+                
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
         
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
         
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
         
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -299,6 +306,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         updateEntryFilters();
     }
 
+    private Integer normalizeValue(Integer policyValue) {
+        return policyValue != null && policyValue < 0 ? null : policyValue;
+    }
+
     private void updateNamespaceDispatchRate(Policies namespacePolicies, 
String cluster) {
         DispatchRateImpl dispatchRate = 
namespacePolicies.topicDispatchRate.get(cluster);
         if (dispatchRate == null) {
@@ -357,12 +368,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
         
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
         
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
-        topicPolicies.getRetentionPolicies().updateBrokerValue(new 
RetentionPolicies(
-                config.getDefaultRetentionTimeInMinutes(), 
config.getDefaultRetentionSizeInMB()));
-        
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
-                config.getBrokerDeduplicationSnapshotIntervalSeconds());
-        topicPolicies.getMaxUnackedMessagesOnConsumer()
-                .updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
+        topicPolicies.getRetentionPolicies().updateBrokerValue(
+                new 
RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), 
config.getDefaultRetentionSizeInMB()));
+        topicPolicies.getDeduplicationSnapshotIntervalSeconds()
+                
.updateBrokerValue(config.getBrokerDeduplicationSnapshotIntervalSeconds());
+        
topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
         topicPolicies.getMaxUnackedMessagesOnSubscription()
                 
.updateBrokerValue(config.getMaxUnackedMessagesPerSubscription());
         //init backlogQuota
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
index 1b1b383e930..f33202c3c40 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
+import lombok.Cleanup;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -112,4 +116,40 @@ public class BrokerInternalClientConfigurationOverrideTest 
extends BrokerTestBas
         Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
     }
 
+    @Test
+    public void testOldNamespacePolicy() throws Exception {
+        
+        String ns = "prop/oldNsWithDefaultNonNullValues";
+        String topic = "persistent://" + ns + "/t1";
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = -1;
+        policies.max_consumers_per_topic = -1;
+        policies.max_producers_per_topic = -1;
+        policies.max_subscriptions_per_topic = -1;
+        policies.max_topics_per_namespace = -1;
+        policies.max_unacked_messages_per_consumer = -1;
+        policies.max_unacked_messages_per_subscription = -1;
+        admin.namespaces().createNamespace(ns, policies);
+        
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic).create();
+        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+        
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
+                conf.getMaxUnackedMessagesPerSubscription());
+        
assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(),
+                conf.getMaxConsumersPerSubscription());
+        assertEquals(topicRef.topicPolicies.getMaxConsumerPerTopic().get(),
+                conf.getMaxConsumersPerTopic());
+        assertEquals(topicRef.topicPolicies.getMaxProducersPerTopic().get(),
+                conf.getMaxProducersPerTopic());
+        
assertEquals(topicRef.topicPolicies.getMaxSubscriptionsPerTopic().get(),
+                conf.getMaxSubscriptionsPerTopic());
+        assertEquals(topicRef.topicPolicies.getTopicMaxMessageSize().get(),
+                conf.getMaxMessageSize());
+        
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnConsumer().get(),
+                conf.getMaxUnackedMessagesPerConsumer());
+        
+        
+    }
 }

Reply via email to