This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53119d01fb0 [improve][broker] PIP427:Align pulsar-admin Default for 
Mark-Delete Rate with Broker Configuration (#24470)
53119d01fb0 is described below

commit 53119d01fb01c2c25e31a5e80a2ee663d517b088
Author: Penghui Li <[email protected]>
AuthorDate: Mon Jul 7 19:28:13 2025 -0700

    [improve][broker] PIP427:Align pulsar-admin Default for Mark-Delete Rate 
with Broker Configuration (#24470)
---
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 214 +++++++++++++++++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 194 +++++++++++++++++++
 .../common/policies/data/PersistencePolicies.java  |   2 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |   8 +-
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |  12 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |   9 +-
 .../pulsar/admin/cli/CmdTopicPoliciesTest.java     |  97 ++++++++++
 .../org/apache/pulsar/admin/cli/TestCmdTopics.java |  41 ++++
 9 files changed, 562 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 359c0daf5b8..6124c913b26 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2056,7 +2056,9 @@ public class BrokerService implements Closeable {
                 }
             }
 
-            
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+            
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
 >= 0
+                    ? persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
+                    : 
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit());
             
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
             
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 8586b54e20d..49faa19fa09 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -654,6 +654,220 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         assertNotNull(topic);
     }
 
+    /**
+     * Test namespace level persistence policies and verify that the internal 
managed ledger
+     * has applied the correct managedLedgerMaxMarkDeleteRate.
+     * 
+     * This test verifies:
+     * 1. Setting namespace persistence policies with 
managedLedgerMaxMarkDeleteRate
+     * 2. Creating a topic and verifying the managed ledger config is applied 
correctly
+     * 3. Verifying that cursors have the correct throttle mark delete rate
+     * 4. Testing policy updates and verifying they are applied to existing 
topics
+     * 5. Testing policy removal and fallback to broker defaults
+     * 6. Testing managedLedgerMaxMarkDeleteRate=-1 which should fall back to 
broker defaults
+     *
+     * @throws Exception
+     */
+    @Test()
+    public void 
testNamespacePersistencePoliciesWithManagedLedgerMaxMarkDeleteRate() throws 
Exception {
+        final String namespace = newUniqueName(defaultTenant + 
"/ns-persistence-test");
+        final String topicName = "persistent://" + namespace + "/test-topic";
+        
+        // Create namespace
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
+        
+        // Test 1: Set initial persistence policies with 
managedLedgerMaxMarkDeleteRate
+        final double initialMarkDeleteRate = 25.0;
+        final int initialEnsembleSize = 3;
+        final int initialWriteQuorum = 2;
+        final int initialAckQuorum = 1;
+        
+        PersistencePolicies initialPolicies = new PersistencePolicies(
+            initialEnsembleSize, initialWriteQuorum, initialAckQuorum, 
initialMarkDeleteRate);
+        
+        admin.namespaces().setPersistence(namespace, initialPolicies);
+        
+        // Verify the policies are set correctly
+        PersistencePolicies retrievedPolicies = 
admin.namespaces().getPersistence(namespace);
+        assertEquals(retrievedPolicies, initialPolicies);
+        assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(), 
initialMarkDeleteRate);
+        
+        // Test 2: Create a topic and verify managed ledger config is applied
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName("test-sub")
+            .subscribe();
+        
+        // Get the topic and managed ledger
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        
+        // Verify managed ledger config has the correct settings
+        assertEquals(managedLedger.getConfig().getEnsembleSize(), 
initialEnsembleSize);
+        assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
initialWriteQuorum);
+        assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
initialAckQuorum);
+        assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
initialMarkDeleteRate);
+        
+        // Test 3: Verify cursor has the correct throttle mark delete rate
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.getCursors().iterator().next();
+        assertEquals(cursor.getThrottleMarkDelete(), initialMarkDeleteRate);
+        
+        // Test 4: Update persistence policies and verify they are applied
+        final double updatedMarkDeleteRate = 75.0;
+        final int updatedEnsembleSize = 5;
+        final int updatedWriteQuorum = 3;
+        final int updatedAckQuorum = 2;
+        
+        PersistencePolicies updatedPolicies = new PersistencePolicies(
+            updatedEnsembleSize, updatedWriteQuorum, updatedAckQuorum, 
updatedMarkDeleteRate);
+        
+        admin.namespaces().setPersistence(namespace, updatedPolicies);
+
+        // Verify the policies are updated correctly
+        retrievedPolicies = admin.namespaces().getPersistence(namespace);
+        assertEquals(retrievedPolicies, updatedPolicies);
+        assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(), 
updatedMarkDeleteRate);
+        
+        // Wait for the managed ledger config to be updated
+        retryStrategically((test) -> 
+            managedLedger.getConfig().getEnsembleSize() == updatedEnsembleSize 
&&
+            managedLedger.getConfig().getWriteQuorumSize() == 
updatedWriteQuorum &&
+            managedLedger.getConfig().getAckQuorumSize() == updatedAckQuorum &&
+            managedLedger.getConfig().getThrottleMarkDelete() == 
updatedMarkDeleteRate &&
+            cursor.getThrottleMarkDelete() == updatedMarkDeleteRate, 
+            10, 200);
+        
+        // Verify managed ledger config has been updated
+        assertEquals(managedLedger.getConfig().getEnsembleSize(), 
updatedEnsembleSize);
+        assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
updatedWriteQuorum);
+        assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
updatedAckQuorum);
+        assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
updatedMarkDeleteRate);
+        
+        // Verify cursor throttle mark delete rate has been updated
+        assertEquals(cursor.getThrottleMarkDelete(), updatedMarkDeleteRate);
+        
+        // Test 5: Create a new topic and verify it uses the updated policies
+        final String newTopicName = "persistent://" + namespace + 
"/new-test-topic";
+        @Cleanup
+        Producer<byte[]> newProducer = pulsarClient.newProducer()
+            .topic(newTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        @Cleanup
+        Consumer<byte[]> newConsumer = pulsarClient.newConsumer()
+            .topic(newTopicName)
+            .subscriptionName("new-test-sub")
+            .subscribe();
+        
+        PersistentTopic newTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(newTopicName).get();
+        ManagedLedgerImpl newManagedLedger = (ManagedLedgerImpl) 
newTopic.getManagedLedger();
+        ManagedCursorImpl newCursor = (ManagedCursorImpl) 
newManagedLedger.getCursors().iterator().next();
+        
+        // Verify new topic uses updated policies
+        assertEquals(newManagedLedger.getConfig().getEnsembleSize(), 
updatedEnsembleSize);
+        assertEquals(newManagedLedger.getConfig().getWriteQuorumSize(), 
updatedWriteQuorum);
+        assertEquals(newManagedLedger.getConfig().getAckQuorumSize(), 
updatedAckQuorum);
+        assertEquals(newManagedLedger.getConfig().getThrottleMarkDelete(), 
updatedMarkDeleteRate);
+        assertEquals(newCursor.getThrottleMarkDelete(), updatedMarkDeleteRate);
+        
+        // Test 6: Test managedLedgerMaxMarkDeleteRate=-1 which should fall 
back to broker defaults
+        final double fallbackMarkDeleteRate = -1.0;
+        final int fallbackEnsembleSize = 4;
+        final int fallbackWriteQuorum = 2;
+        final int fallbackAckQuorum = 1;
+        
+        PersistencePolicies fallbackPolicies = new PersistencePolicies(
+            fallbackEnsembleSize, fallbackWriteQuorum, fallbackAckQuorum, 
fallbackMarkDeleteRate);
+        
+        admin.namespaces().setPersistence(namespace, fallbackPolicies);
+        
+        // Verify the policies are set correctly
+        retrievedPolicies = admin.namespaces().getPersistence(namespace);
+        assertEquals(retrievedPolicies, fallbackPolicies);
+        assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(), 
fallbackMarkDeleteRate);
+        
+        // Create a topic to verify it uses broker defaults for mark delete 
rate
+        final String fallbackTopicName = "persistent://" + namespace + 
"/fallback-test-topic";
+        @Cleanup
+        Producer<byte[]> fallbackProducer = pulsarClient.newProducer()
+            .topic(fallbackTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        @Cleanup
+        Consumer<byte[]> fallbackConsumer = pulsarClient.newConsumer()
+            .topic(fallbackTopicName)
+            .subscriptionName("fallback-test-sub")
+            .subscribe();
+        
+        PersistentTopic fallbackTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(fallbackTopicName).get();
+        ManagedLedgerImpl fallbackManagedLedger = (ManagedLedgerImpl) 
fallbackTopic.getManagedLedger();
+        ManagedCursorImpl fallbackCursor = (ManagedCursorImpl) 
fallbackManagedLedger.getCursors().iterator().next();
+        
+        // Verify new topic uses fallback policies (broker defaults for mark 
delete rate)
+        assertEquals(fallbackManagedLedger.getConfig().getEnsembleSize(), 
fallbackEnsembleSize);
+        assertEquals(fallbackManagedLedger.getConfig().getWriteQuorumSize(), 
fallbackWriteQuorum);
+        assertEquals(fallbackManagedLedger.getConfig().getAckQuorumSize(), 
fallbackAckQuorum);
+        
assertEquals(fallbackManagedLedger.getConfig().getThrottleMarkDelete(), 
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        assertEquals(fallbackCursor.getThrottleMarkDelete(), 
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        
+        // Test 7: Remove namespace persistence policies and verify fallback 
to broker defaults
+        admin.namespaces().removePersistence(namespace);
+        
+        // Verify policies are removed
+        assertNull(admin.namespaces().getPersistence(namespace));
+        
+        // Create another topic to verify it uses broker defaults
+        final String defaultTopicName = "persistent://" + namespace + 
"/default-test-topic";
+        @Cleanup
+        Producer<byte[]> defaultProducer = pulsarClient.newProducer()
+            .topic(defaultTopicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        @Cleanup
+        Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer()
+            .topic(defaultTopicName)
+            .subscriptionName("default-test-sub")
+            .subscribe();
+        
+        PersistentTopic defaultTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(defaultTopicName).get();
+        ManagedLedgerImpl defaultManagedLedger = (ManagedLedgerImpl) 
defaultTopic.getManagedLedger();
+        ManagedCursorImpl defaultCursor = (ManagedCursorImpl) 
defaultManagedLedger.getCursors().iterator().next();
+        
+        // Verify new topic uses broker defaults
+        assertEquals(defaultManagedLedger.getConfig().getEnsembleSize(), 
+            pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize());
+        assertEquals(defaultManagedLedger.getConfig().getWriteQuorumSize(), 
+            pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum());
+        assertEquals(defaultManagedLedger.getConfig().getAckQuorumSize(), 
+            pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum());
+        assertEquals(defaultManagedLedger.getConfig().getThrottleMarkDelete(), 
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        assertEquals(defaultCursor.getThrottleMarkDelete(), 
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        
+        // Test 8: Verify that existing topic uses broker defaults
+        assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        assertEquals(cursor.getThrottleMarkDelete(),
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        assertEquals(newManagedLedger.getConfig().getThrottleMarkDelete(),
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+        assertEquals(newCursor.getThrottleMarkDelete(),
+            
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+    }
+
     private void unloadTopic(String topicName) throws Exception {
         admin.topics().unload(topicName);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index dc412eef802..4bf3cfcb44b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -50,6 +50,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -69,6 +70,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -3883,4 +3885,196 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
             admin.topics().deletePartitionedTopic(topic, false);
         }
     }
+
+    /**
+     * Test topic level persistence policies and verify that the internal 
managed ledger
+     * has applied the correct managedLedgerMaxMarkDeleteRate.
+     *
+     * This test verifies:
+     * 1. Setting topic persistence policies with 
managedLedgerMaxMarkDeleteRate
+     * 2. Creating a topic and verifying the managed ledger config is applied 
correctly
+     * 3. Verifying that cursors have the correct throttle mark delete rate
+     * 4. Testing policy updates and verifying they are applied to existing 
topics
+     * 5. Testing policy removal and fallback to broker defaults
+     * 6. Testing managedLedgerMaxMarkDeleteRate=-1 which should fall back to 
broker defaults
+     *
+     * @throws Exception
+     */
+    @Test
+    public void 
testTopicPersistencePoliciesWithManagedLedgerMaxMarkDeleteRate() throws 
Exception {
+        final String topicName = "persistent://" + myNamespace + 
"/test-topic-persistence-policies";
+        
+        // Create topic
+        admin.topics().createNonPartitionedTopic(topicName);
+        
+        // Test 1: Set initial topic persistence policies with 
managedLedgerMaxMarkDeleteRate
+        final double initialMarkDeleteRate = 25;
+        final int initialEnsembleSize = 4;
+        final int initialWriteQuorum = 3;
+        final int initialAckQuorum = 2;
+        
+        PersistencePolicies initialPolicies = new PersistencePolicies(
+            initialEnsembleSize, initialWriteQuorum, initialAckQuorum, 
initialMarkDeleteRate);
+        
+        admin.topicPolicies().setPersistence(topicName, initialPolicies);
+        
+        // Verify the policies are set correctly
+        Awaitility.await().untilAsserted(() -> {
+            PersistencePolicies retrievedPolicies = 
admin.topicPolicies().getPersistence(topicName);
+            assertEquals(retrievedPolicies, initialPolicies);
+            
assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(), 
initialMarkDeleteRate);
+        });
+        
+        // Test 2: Create producer and consumer to load the topic and verify 
managed ledger config
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName("test-sub")
+            .subscribe();
+        
+        // Get the topic and managed ledger
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        
+        // Verify managed ledger config has the correct settings
+        assertEquals(managedLedger.getConfig().getEnsembleSize(), 
initialEnsembleSize);
+        assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
initialWriteQuorum);
+        assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
initialAckQuorum);
+        assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
initialMarkDeleteRate, 0.0001);
+        
+        // Test 3: Verify cursor has the correct throttle mark delete rate
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.getCursors().iterator().next();
+        assertEquals(cursor.getThrottleMarkDelete(), initialMarkDeleteRate, 
0.0001);
+        
+        // Test 4: Update topic persistence policies and verify they are 
applied
+        final double updatedMarkDeleteRate = 75;
+        final int updatedEnsembleSize = 3;
+        final int updatedWriteQuorum = 2;
+        final int updatedAckQuorum = 1;
+        
+        PersistencePolicies updatedPolicies = new PersistencePolicies(
+            updatedEnsembleSize, updatedWriteQuorum, updatedAckQuorum, 
updatedMarkDeleteRate);
+        
+        admin.topicPolicies().setPersistence(topicName, updatedPolicies);
+
+        // Verify the policies are updated correctly
+        Awaitility.await().untilAsserted(() -> {
+            PersistencePolicies retrievedPolicies = 
admin.topicPolicies().getPersistence(topicName);
+            assertEquals(retrievedPolicies, updatedPolicies);
+            
assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(), 
updatedMarkDeleteRate);
+        });
+        
+        // Wait for the managed ledger config to be updated
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getConfig().getEnsembleSize(), 
updatedEnsembleSize);
+            assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
updatedWriteQuorum);
+            assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
updatedAckQuorum);
+            assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
updatedMarkDeleteRate, 0.0001);
+            assertEquals(cursor.getThrottleMarkDelete(), 
updatedMarkDeleteRate, 0.0001);
+        });
+        
+        // Test 5: Test managedLedgerMaxMarkDeleteRate=-1 which should fall 
back to namespace/broker defaults
+        final double fallbackMarkDeleteRate = -1.0;
+        final int fallbackEnsembleSize = 5;
+        final int fallbackWriteQuorum = 3;
+        final int fallbackAckQuorum = 2;
+        
+        PersistencePolicies fallbackPolicies = new PersistencePolicies(
+            fallbackEnsembleSize, fallbackWriteQuorum, fallbackAckQuorum, 
fallbackMarkDeleteRate);
+        
+        admin.topicPolicies().setPersistence(topicName, fallbackPolicies);
+        
+        // Verify the policies are set correctly
+        Awaitility.await().untilAsserted(() -> {
+            PersistencePolicies retrievedPolicies = 
admin.topicPolicies().getPersistence(topicName);
+            assertEquals(retrievedPolicies, fallbackPolicies);
+            
assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(), 
fallbackMarkDeleteRate);
+        });
+        
+        // Wait for the managed ledger config to be updated with fallback 
values
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getConfig().getEnsembleSize(), 
fallbackEnsembleSize);
+            assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
fallbackWriteQuorum);
+            assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
fallbackAckQuorum);
+            // Should fall back to broker default for mark delete rate
+            assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
+                
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+            assertEquals(cursor.getThrottleMarkDelete(), 
+                
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+        });
+        
+        // Test 6: Set namespace level persistence policies and verify topic 
level takes precedence
+        final double namespaceMarkDeleteRate = 55;
+        final int namespaceEnsembleSize = 3;
+        final int namespaceWriteQuorum = 2;
+        final int namespaceAckQuorum = 1;
+        
+        PersistencePolicies namespacePolicies = new PersistencePolicies(
+            namespaceEnsembleSize, namespaceWriteQuorum, namespaceAckQuorum, 
namespaceMarkDeleteRate);
+        
+        admin.namespaces().setPersistence(myNamespace, namespacePolicies);
+        
+        // Set topic level policies that should override namespace level
+        final double topicMarkDeleteRate = 95;
+        final int topicEnsembleSize = 3;
+        final int topicWriteQuorum = 2;
+        final int topicAckQuorum = 1;
+        
+        PersistencePolicies topicPolicies = new PersistencePolicies(
+            topicEnsembleSize, topicWriteQuorum, topicAckQuorum, 
topicMarkDeleteRate);
+        
+        admin.topicPolicies().setPersistence(topicName, topicPolicies);
+        
+        // Verify topic level policies take precedence over namespace level
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getConfig().getEnsembleSize(), 
topicEnsembleSize);
+            assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
topicWriteQuorum);
+            assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
topicAckQuorum);
+            assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
topicMarkDeleteRate, 0.0001);
+            assertEquals(cursor.getThrottleMarkDelete(), topicMarkDeleteRate, 
0.0001);
+        });
+        
+        // Test 7: Remove topic persistence policies and verify fallback to 
namespace level
+        admin.topicPolicies().removePersistence(topicName);
+        
+        // Verify topic policies are removed
+        Awaitility.await().untilAsserted(() -> 
+            assertNull(admin.topicPolicies().getPersistence(topicName)));
+        
+        // Verify managed ledger config falls back to namespace level policies
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getConfig().getEnsembleSize(), 
namespaceEnsembleSize);
+            assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
namespaceWriteQuorum);
+            assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
namespaceAckQuorum);
+            assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
namespaceMarkDeleteRate, 0.0001);
+            assertEquals(cursor.getThrottleMarkDelete(), 
namespaceMarkDeleteRate, 0.0001);
+        });
+        
+        // Test 8: Remove namespace persistence policies and verify fallback 
to broker defaults
+        admin.namespaces().removePersistence(myNamespace);
+        
+        // Verify namespace policies are removed
+        Awaitility.await().untilAsserted(() -> 
+            assertNull(admin.namespaces().getPersistence(myNamespace)));
+        
+        // Verify managed ledger config falls back to broker defaults
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getConfig().getEnsembleSize(), 
+                
pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize());
+            assertEquals(managedLedger.getConfig().getWriteQuorumSize(), 
+                
pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum());
+            assertEquals(managedLedger.getConfig().getAckQuorumSize(), 
+                pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum());
+            assertEquals(managedLedger.getConfig().getThrottleMarkDelete(), 
+                
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+            assertEquals(cursor.getThrottleMarkDelete(), 
+                
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+        });
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
index 3fbc91e7d2e..70ff2ab8a4e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
@@ -33,7 +33,7 @@ public class PersistencePolicies {
     private String managedLedgerStorageClassName;
 
     public PersistencePolicies() {
-        this(2, 2, 2, 0.0, null);
+        this(2, 2, 2, -1, null);
     }
 
     public PersistencePolicies(int bookkeeperEnsemble, int 
bookkeeperWriteQuorum, int bookkeeperAckQuorum,
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 8adedcd14ac..51712ece5b7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1376,8 +1376,9 @@ public class CmdNamespaces extends CmdBase {
 
         @Option(names = { "-r",
                 "--ml-mark-delete-max-rate" },
-                description = "Throttling rate of mark-delete operation (0 
means no throttle)")
-        private double managedLedgerMaxMarkDeleteRate = 0;
+                description = "Throttling rate of mark-delete operation "
+                        + "(0 means no throttle, -1 means unset which will use 
the default configuration from broker)")
+        private double managedLedgerMaxMarkDeleteRate = -1;
 
         @Option(names = { "-c",
                 "--ml-storage-class" },
@@ -1391,9 +1392,6 @@ public class CmdNamespaces extends CmdBase {
                 throw new ParameterException("[--bookkeeper-ensemble], 
[--bookkeeper-write-quorum] "
                         + "and [--bookkeeper-ack-quorum] must greater than 
0.");
             }
-            if (managedLedgerMaxMarkDeleteRate < 0) {
-                throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
-            }
             getAdmin().namespaces().setPersistence(namespace, new 
PersistencePolicies(bookkeeperEnsemble,
                     bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate,
                     managedLedgerStorageClassName));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 9a5714cb58c..7b7883af9ac 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1188,9 +1188,12 @@ public class CmdTopicPolicies extends CmdBase {
                 description = "Number of acks (guaranteed copies) to wait for 
each entry")
         private int bookkeeperAckQuorum = 2;
 
-        @Option(names = { "-r", "--ml-mark-delete-max-rate" },
-                description = "Throttling rate of mark-delete operation (0 
means no throttle)")
-        private double managedLedgerMaxMarkDeleteRate = 0;
+        @Option(names = { "-r",
+                "--ml-mark-delete-max-rate" },
+                description = "Throttling rate of mark-delete operation "
+                        + "(0 means no throttle, -1 means unset which will use 
"
+                        + "the configuration from namespace or broker)")
+        private double managedLedgerMaxMarkDeleteRate = -1;
 
         @Option(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
                 + "If set to true, the policy will be replicate to other 
clusters asynchronously", arity = "0")
@@ -1208,9 +1211,6 @@ public class CmdTopicPolicies extends CmdBase {
                 throw new ParameterException("[--bookkeeper-ensemble], 
[--bookkeeper-write-quorum] "
                         + "and [--bookkeeper-ack-quorum] must greater than 
0.");
             }
-            if (managedLedgerMaxMarkDeleteRate < 0) {
-                throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
-            }
             getTopicPolicies(isGlobal).setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
                     bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate,
                     managedLedgerStorageClassName));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e123db66fa7..e29a4dcd7a8 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -2153,8 +2153,8 @@ public class CmdTopics extends CmdBase {
 
         @Option(names = { "-r",
                 "--ml-mark-delete-max-rate" }, description = "Throttling rate 
of mark-delete operation "
-                + "(0 means no throttle)")
-        private double managedLedgerMaxMarkDeleteRate = 0;
+                + "(0 means no throttle, -1 means unset which will use the 
configuration from namespace or broker)")
+        private double managedLedgerMaxMarkDeleteRate = -1;
 
         @Option(names = { "-c",
                 "--ml-storage-class" },
@@ -2168,9 +2168,6 @@ public class CmdTopics extends CmdBase {
                 throw new ParameterException("[--bookkeeper-ensemble], 
[--bookkeeper-write-quorum] "
                         + "and [--bookkeeper-ack-quorum] must greater than 
0.");
             }
-            if (managedLedgerMaxMarkDeleteRate < 0) {
-                throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
-            }
             getTopics().setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
                     bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate,
                     managedLedgerStorageClassName));
@@ -2509,7 +2506,7 @@ public class CmdTopics extends CmdBase {
 
         @Option(names = { "--dispatch-rate-period",
             "-dt" }, description = "dispatch-rate-period in second type"
-                + " (default 1 second will be overwrite if not passed)")
+                + "(default 1 second will be overwrite if not passed)")
         private int dispatchRatePeriodSec = 1;
 
         @Option(names = { "--relative-to-publish-rate",
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
index 59d3be39019..6a0b6f6142d 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.TopicPolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.testng.annotations.Test;
 
 public class CmdTopicPoliciesTest {
@@ -44,4 +45,100 @@ public class CmdTopicPoliciesTest {
         verify(topicPolicies, 
times(1)).setRetention("persistent://public/default/topic",
                 new RetentionPolicies(200 * 24 * 60, 2 * 1024 * 1024));
     }
+
+    @Test
+    public void testSetPersistenceWithDefaultMarkDeleteRate() throws Exception 
{
+        TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+        CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+        // Test that the default value is now -1 (unset) instead of 0
+        cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2 
-a 2".split("\\s+"));
+
+        verify(topicPolicies, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -1.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithNegativeMarkDeleteRate() throws 
Exception {
+        TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+        CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+        // Test that negative values are now allowed (previously would throw 
exception)
+        cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2 
-a 2 -r -5.0".split("\\s+"));
+
+        verify(topicPolicies, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -5.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithZeroMarkDeleteRate() throws Exception {
+        TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+        CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+        // Test that zero is still allowed
+        cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2 
-a 2 -r 0".split("\\s+"));
+
+        verify(topicPolicies, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, 0.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithPositiveMarkDeleteRate() throws 
Exception {
+        TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+        CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+        // Test that positive values still work
+        cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2 
-a 2 -r 10.5".split("\\s+"));
+
+        verify(topicPolicies, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, 10.5, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithUnsetMarkDeleteRate() throws Exception {
+        TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+        CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+        // Test explicitly setting to -1 (unset)
+        cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2 
-a 2 -r -1".split("\\s+"));
+
+        verify(topicPolicies, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -1.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithGlobalFlag() throws Exception {
+        TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        when(admin.topicPolicies(true)).thenReturn(topicPolicies);
+
+        CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+        // Test with global flag
+        cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2 
-a 2 -r -1 -g".split("\\s+"));
+
+        verify(topicPolicies, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -1.0, null));
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
index bd926edc5a8..918051c0a49 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -267,4 +268,44 @@ public class TestCmdTopics {
         verify(mockTopics, 
times(1)).setRetention("persistent://public/default/topic",
                 new RetentionPolicies(200 * 24 * 60, 2 * 1024 * 1024));
     }
+
+    @Test
+    public void testSetPersistenceWithDefaultMarkDeleteRate() throws Exception 
{
+        // Test that the default value is now -1 (unset) instead of 0
+        cmdTopics.run("set-persistence persistent://public/default/topic -e 2 
-w 2 -a 2".split("\\s+"));
+        verify(mockTopics, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -1.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithNegativeMarkDeleteRate() throws 
Exception {
+        // Test that negative values are now allowed (previously would throw 
exception)
+        cmdTopics.run("set-persistence persistent://public/default/topic -e 2 
-w 2 -a 2 -r -5.0".split("\\s+"));
+        verify(mockTopics, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -5.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithZeroMarkDeleteRate() throws Exception {
+        // Test that zero is still allowed
+        cmdTopics.run("set-persistence persistent://public/default/topic -e 2 
-w 2 -a 2 -r 0".split("\\s+"));
+        verify(mockTopics, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, 0.0, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithPositiveMarkDeleteRate() throws 
Exception {
+        // Test that positive values still work
+        cmdTopics.run("set-persistence persistent://public/default/topic -e 2 
-w 2 -a 2 -r 10.5".split("\\s+"));
+        verify(mockTopics, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, 10.5, null));
+    }
+
+    @Test
+    public void testSetPersistenceWithUnsetMarkDeleteRate() throws Exception {
+        // Test explicitly setting to -1 (unset)
+        cmdTopics.run("set-persistence persistent://public/default/topic -e 2 
-w 2 -a 2 -r -1".split("\\s+"));
+        verify(mockTopics, 
times(1)).setPersistence("persistent://public/default/topic", 
+                new PersistencePolicies(2, 2, 2, -1.0, null));
+    }
 }


Reply via email to