This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53119d01fb0 [improve][broker] PIP427:Align pulsar-admin Default for
Mark-Delete Rate with Broker Configuration (#24470)
53119d01fb0 is described below
commit 53119d01fb01c2c25e31a5e80a2ee663d517b088
Author: Penghui Li <[email protected]>
AuthorDate: Mon Jul 7 19:28:13 2025 -0700
[improve][broker] PIP427:Align pulsar-admin Default for Mark-Delete Rate
with Broker Configuration (#24470)
---
.../pulsar/broker/service/BrokerService.java | 4 +-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 214 +++++++++++++++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 194 +++++++++++++++++++
.../common/policies/data/PersistencePolicies.java | 2 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 8 +-
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 12 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 9 +-
.../pulsar/admin/cli/CmdTopicPoliciesTest.java | 97 ++++++++++
.../org/apache/pulsar/admin/cli/TestCmdTopics.java | 41 ++++
9 files changed, 562 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 359c0daf5b8..6124c913b26 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2056,7 +2056,9 @@ public class BrokerService implements Closeable {
}
}
-
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
>= 0
+ ? persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
+ :
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 8586b54e20d..49faa19fa09 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -654,6 +654,220 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertNotNull(topic);
}
+ /**
+ * Test namespace level persistence policies and verify that the internal
managed ledger
+ * has applied the correct managedLedgerMaxMarkDeleteRate.
+ *
+ * This test verifies:
+ * 1. Setting namespace persistence policies with
managedLedgerMaxMarkDeleteRate
+ * 2. Creating a topic and verifying the managed ledger config is applied
correctly
+ * 3. Verifying that cursors have the correct throttle mark delete rate
+ * 4. Testing policy updates and verifying they are applied to existing
topics
+ * 5. Testing policy removal and fallback to broker defaults
+ * 6. Testing managedLedgerMaxMarkDeleteRate=-1 which should fall back to
broker defaults
+ *
+ * @throws Exception
+ */
+ @Test()
+ public void
testNamespacePersistencePoliciesWithManagedLedgerMaxMarkDeleteRate() throws
Exception {
+ final String namespace = newUniqueName(defaultTenant +
"/ns-persistence-test");
+ final String topicName = "persistent://" + namespace + "/test-topic";
+
+ // Create namespace
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
+
+ // Test 1: Set initial persistence policies with
managedLedgerMaxMarkDeleteRate
+ final double initialMarkDeleteRate = 25.0;
+ final int initialEnsembleSize = 3;
+ final int initialWriteQuorum = 2;
+ final int initialAckQuorum = 1;
+
+ PersistencePolicies initialPolicies = new PersistencePolicies(
+ initialEnsembleSize, initialWriteQuorum, initialAckQuorum,
initialMarkDeleteRate);
+
+ admin.namespaces().setPersistence(namespace, initialPolicies);
+
+ // Verify the policies are set correctly
+ PersistencePolicies retrievedPolicies =
admin.namespaces().getPersistence(namespace);
+ assertEquals(retrievedPolicies, initialPolicies);
+ assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(),
initialMarkDeleteRate);
+
+ // Test 2: Create a topic and verify managed ledger config is applied
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Get the topic and managed ledger
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+
+ // Verify managed ledger config has the correct settings
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
initialEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
initialWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
initialAckQuorum);
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
initialMarkDeleteRate);
+
+ // Test 3: Verify cursor has the correct throttle mark delete rate
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
managedLedger.getCursors().iterator().next();
+ assertEquals(cursor.getThrottleMarkDelete(), initialMarkDeleteRate);
+
+ // Test 4: Update persistence policies and verify they are applied
+ final double updatedMarkDeleteRate = 75.0;
+ final int updatedEnsembleSize = 5;
+ final int updatedWriteQuorum = 3;
+ final int updatedAckQuorum = 2;
+
+ PersistencePolicies updatedPolicies = new PersistencePolicies(
+ updatedEnsembleSize, updatedWriteQuorum, updatedAckQuorum,
updatedMarkDeleteRate);
+
+ admin.namespaces().setPersistence(namespace, updatedPolicies);
+
+ // Verify the policies are updated correctly
+ retrievedPolicies = admin.namespaces().getPersistence(namespace);
+ assertEquals(retrievedPolicies, updatedPolicies);
+ assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(),
updatedMarkDeleteRate);
+
+ // Wait for the managed ledger config to be updated
+ retryStrategically((test) ->
+ managedLedger.getConfig().getEnsembleSize() == updatedEnsembleSize
&&
+ managedLedger.getConfig().getWriteQuorumSize() ==
updatedWriteQuorum &&
+ managedLedger.getConfig().getAckQuorumSize() == updatedAckQuorum &&
+ managedLedger.getConfig().getThrottleMarkDelete() ==
updatedMarkDeleteRate &&
+ cursor.getThrottleMarkDelete() == updatedMarkDeleteRate,
+ 10, 200);
+
+ // Verify managed ledger config has been updated
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
updatedEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
updatedWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
updatedAckQuorum);
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
updatedMarkDeleteRate);
+
+ // Verify cursor throttle mark delete rate has been updated
+ assertEquals(cursor.getThrottleMarkDelete(), updatedMarkDeleteRate);
+
+ // Test 5: Create a new topic and verify it uses the updated policies
+ final String newTopicName = "persistent://" + namespace +
"/new-test-topic";
+ @Cleanup
+ Producer<byte[]> newProducer = pulsarClient.newProducer()
+ .topic(newTopicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ @Cleanup
+ Consumer<byte[]> newConsumer = pulsarClient.newConsumer()
+ .topic(newTopicName)
+ .subscriptionName("new-test-sub")
+ .subscribe();
+
+ PersistentTopic newTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(newTopicName).get();
+ ManagedLedgerImpl newManagedLedger = (ManagedLedgerImpl)
newTopic.getManagedLedger();
+ ManagedCursorImpl newCursor = (ManagedCursorImpl)
newManagedLedger.getCursors().iterator().next();
+
+ // Verify new topic uses updated policies
+ assertEquals(newManagedLedger.getConfig().getEnsembleSize(),
updatedEnsembleSize);
+ assertEquals(newManagedLedger.getConfig().getWriteQuorumSize(),
updatedWriteQuorum);
+ assertEquals(newManagedLedger.getConfig().getAckQuorumSize(),
updatedAckQuorum);
+ assertEquals(newManagedLedger.getConfig().getThrottleMarkDelete(),
updatedMarkDeleteRate);
+ assertEquals(newCursor.getThrottleMarkDelete(), updatedMarkDeleteRate);
+
+ // Test 6: Test managedLedgerMaxMarkDeleteRate=-1 which should fall
back to broker defaults
+ final double fallbackMarkDeleteRate = -1.0;
+ final int fallbackEnsembleSize = 4;
+ final int fallbackWriteQuorum = 2;
+ final int fallbackAckQuorum = 1;
+
+ PersistencePolicies fallbackPolicies = new PersistencePolicies(
+ fallbackEnsembleSize, fallbackWriteQuorum, fallbackAckQuorum,
fallbackMarkDeleteRate);
+
+ admin.namespaces().setPersistence(namespace, fallbackPolicies);
+
+ // Verify the policies are set correctly
+ retrievedPolicies = admin.namespaces().getPersistence(namespace);
+ assertEquals(retrievedPolicies, fallbackPolicies);
+ assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(),
fallbackMarkDeleteRate);
+
+ // Create a topic to verify it uses broker defaults for mark delete
rate
+ final String fallbackTopicName = "persistent://" + namespace +
"/fallback-test-topic";
+ @Cleanup
+ Producer<byte[]> fallbackProducer = pulsarClient.newProducer()
+ .topic(fallbackTopicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ @Cleanup
+ Consumer<byte[]> fallbackConsumer = pulsarClient.newConsumer()
+ .topic(fallbackTopicName)
+ .subscriptionName("fallback-test-sub")
+ .subscribe();
+
+ PersistentTopic fallbackTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(fallbackTopicName).get();
+ ManagedLedgerImpl fallbackManagedLedger = (ManagedLedgerImpl)
fallbackTopic.getManagedLedger();
+ ManagedCursorImpl fallbackCursor = (ManagedCursorImpl)
fallbackManagedLedger.getCursors().iterator().next();
+
+ // Verify new topic uses fallback policies (broker defaults for mark
delete rate)
+ assertEquals(fallbackManagedLedger.getConfig().getEnsembleSize(),
fallbackEnsembleSize);
+ assertEquals(fallbackManagedLedger.getConfig().getWriteQuorumSize(),
fallbackWriteQuorum);
+ assertEquals(fallbackManagedLedger.getConfig().getAckQuorumSize(),
fallbackAckQuorum);
+
assertEquals(fallbackManagedLedger.getConfig().getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ assertEquals(fallbackCursor.getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+
+ // Test 7: Remove namespace persistence policies and verify fallback
to broker defaults
+ admin.namespaces().removePersistence(namespace);
+
+ // Verify policies are removed
+ assertNull(admin.namespaces().getPersistence(namespace));
+
+ // Create another topic to verify it uses broker defaults
+ final String defaultTopicName = "persistent://" + namespace +
"/default-test-topic";
+ @Cleanup
+ Producer<byte[]> defaultProducer = pulsarClient.newProducer()
+ .topic(defaultTopicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ @Cleanup
+ Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer()
+ .topic(defaultTopicName)
+ .subscriptionName("default-test-sub")
+ .subscribe();
+
+ PersistentTopic defaultTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(defaultTopicName).get();
+ ManagedLedgerImpl defaultManagedLedger = (ManagedLedgerImpl)
defaultTopic.getManagedLedger();
+ ManagedCursorImpl defaultCursor = (ManagedCursorImpl)
defaultManagedLedger.getCursors().iterator().next();
+
+ // Verify new topic uses broker defaults
+ assertEquals(defaultManagedLedger.getConfig().getEnsembleSize(),
+ pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize());
+ assertEquals(defaultManagedLedger.getConfig().getWriteQuorumSize(),
+ pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum());
+ assertEquals(defaultManagedLedger.getConfig().getAckQuorumSize(),
+ pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum());
+ assertEquals(defaultManagedLedger.getConfig().getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ assertEquals(defaultCursor.getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+
+ // Test 8: Verify that existing topic uses broker defaults
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ assertEquals(cursor.getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ assertEquals(newManagedLedger.getConfig().getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ assertEquals(newCursor.getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
+ }
+
private void unloadTopic(String topicName) throws Exception {
admin.topics().unload(topicName);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index dc412eef802..4bf3cfcb44b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -50,6 +50,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -69,6 +70,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -3883,4 +3885,196 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(topic, false);
}
}
+
+ /**
+ * Test topic level persistence policies and verify that the internal
managed ledger
+ * has applied the correct managedLedgerMaxMarkDeleteRate.
+ *
+ * This test verifies:
+ * 1. Setting topic persistence policies with
managedLedgerMaxMarkDeleteRate
+ * 2. Creating a topic and verifying the managed ledger config is applied
correctly
+ * 3. Verifying that cursors have the correct throttle mark delete rate
+ * 4. Testing policy updates and verifying they are applied to existing
topics
+ * 5. Testing policy removal and fallback to broker defaults
+ * 6. Testing managedLedgerMaxMarkDeleteRate=-1 which should fall back to
broker defaults
+ *
+ * @throws Exception
+ */
+ @Test
+ public void
testTopicPersistencePoliciesWithManagedLedgerMaxMarkDeleteRate() throws
Exception {
+ final String topicName = "persistent://" + myNamespace +
"/test-topic-persistence-policies";
+
+ // Create topic
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // Test 1: Set initial topic persistence policies with
managedLedgerMaxMarkDeleteRate
+ final double initialMarkDeleteRate = 25;
+ final int initialEnsembleSize = 4;
+ final int initialWriteQuorum = 3;
+ final int initialAckQuorum = 2;
+
+ PersistencePolicies initialPolicies = new PersistencePolicies(
+ initialEnsembleSize, initialWriteQuorum, initialAckQuorum,
initialMarkDeleteRate);
+
+ admin.topicPolicies().setPersistence(topicName, initialPolicies);
+
+ // Verify the policies are set correctly
+ Awaitility.await().untilAsserted(() -> {
+ PersistencePolicies retrievedPolicies =
admin.topicPolicies().getPersistence(topicName);
+ assertEquals(retrievedPolicies, initialPolicies);
+
assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(),
initialMarkDeleteRate);
+ });
+
+ // Test 2: Create producer and consumer to load the topic and verify
managed ledger config
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Get the topic and managed ledger
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+
+ // Verify managed ledger config has the correct settings
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
initialEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
initialWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
initialAckQuorum);
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
initialMarkDeleteRate, 0.0001);
+
+ // Test 3: Verify cursor has the correct throttle mark delete rate
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
managedLedger.getCursors().iterator().next();
+ assertEquals(cursor.getThrottleMarkDelete(), initialMarkDeleteRate,
0.0001);
+
+ // Test 4: Update topic persistence policies and verify they are
applied
+ final double updatedMarkDeleteRate = 75;
+ final int updatedEnsembleSize = 3;
+ final int updatedWriteQuorum = 2;
+ final int updatedAckQuorum = 1;
+
+ PersistencePolicies updatedPolicies = new PersistencePolicies(
+ updatedEnsembleSize, updatedWriteQuorum, updatedAckQuorum,
updatedMarkDeleteRate);
+
+ admin.topicPolicies().setPersistence(topicName, updatedPolicies);
+
+ // Verify the policies are updated correctly
+ Awaitility.await().untilAsserted(() -> {
+ PersistencePolicies retrievedPolicies =
admin.topicPolicies().getPersistence(topicName);
+ assertEquals(retrievedPolicies, updatedPolicies);
+
assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(),
updatedMarkDeleteRate);
+ });
+
+ // Wait for the managed ledger config to be updated
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
updatedEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
updatedWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
updatedAckQuorum);
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
updatedMarkDeleteRate, 0.0001);
+ assertEquals(cursor.getThrottleMarkDelete(),
updatedMarkDeleteRate, 0.0001);
+ });
+
+ // Test 5: Test managedLedgerMaxMarkDeleteRate=-1 which should fall
back to namespace/broker defaults
+ final double fallbackMarkDeleteRate = -1.0;
+ final int fallbackEnsembleSize = 5;
+ final int fallbackWriteQuorum = 3;
+ final int fallbackAckQuorum = 2;
+
+ PersistencePolicies fallbackPolicies = new PersistencePolicies(
+ fallbackEnsembleSize, fallbackWriteQuorum, fallbackAckQuorum,
fallbackMarkDeleteRate);
+
+ admin.topicPolicies().setPersistence(topicName, fallbackPolicies);
+
+ // Verify the policies are set correctly
+ Awaitility.await().untilAsserted(() -> {
+ PersistencePolicies retrievedPolicies =
admin.topicPolicies().getPersistence(topicName);
+ assertEquals(retrievedPolicies, fallbackPolicies);
+
assertEquals(retrievedPolicies.getManagedLedgerMaxMarkDeleteRate(),
fallbackMarkDeleteRate);
+ });
+
+ // Wait for the managed ledger config to be updated with fallback
values
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
fallbackEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
fallbackWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
fallbackAckQuorum);
+ // Should fall back to broker default for mark delete rate
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+ assertEquals(cursor.getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+ });
+
+ // Test 6: Set namespace level persistence policies and verify topic
level takes precedence
+ final double namespaceMarkDeleteRate = 55;
+ final int namespaceEnsembleSize = 3;
+ final int namespaceWriteQuorum = 2;
+ final int namespaceAckQuorum = 1;
+
+ PersistencePolicies namespacePolicies = new PersistencePolicies(
+ namespaceEnsembleSize, namespaceWriteQuorum, namespaceAckQuorum,
namespaceMarkDeleteRate);
+
+ admin.namespaces().setPersistence(myNamespace, namespacePolicies);
+
+ // Set topic level policies that should override namespace level
+ final double topicMarkDeleteRate = 95;
+ final int topicEnsembleSize = 3;
+ final int topicWriteQuorum = 2;
+ final int topicAckQuorum = 1;
+
+ PersistencePolicies topicPolicies = new PersistencePolicies(
+ topicEnsembleSize, topicWriteQuorum, topicAckQuorum,
topicMarkDeleteRate);
+
+ admin.topicPolicies().setPersistence(topicName, topicPolicies);
+
+ // Verify topic level policies take precedence over namespace level
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
topicEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
topicWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
topicAckQuorum);
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
topicMarkDeleteRate, 0.0001);
+ assertEquals(cursor.getThrottleMarkDelete(), topicMarkDeleteRate,
0.0001);
+ });
+
+ // Test 7: Remove topic persistence policies and verify fallback to
namespace level
+ admin.topicPolicies().removePersistence(topicName);
+
+ // Verify topic policies are removed
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin.topicPolicies().getPersistence(topicName)));
+
+ // Verify managed ledger config falls back to namespace level policies
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
namespaceEnsembleSize);
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
namespaceWriteQuorum);
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
namespaceAckQuorum);
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
namespaceMarkDeleteRate, 0.0001);
+ assertEquals(cursor.getThrottleMarkDelete(),
namespaceMarkDeleteRate, 0.0001);
+ });
+
+ // Test 8: Remove namespace persistence policies and verify fallback
to broker defaults
+ admin.namespaces().removePersistence(myNamespace);
+
+ // Verify namespace policies are removed
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin.namespaces().getPersistence(myNamespace)));
+
+ // Verify managed ledger config falls back to broker defaults
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getConfig().getEnsembleSize(),
+
pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize());
+ assertEquals(managedLedger.getConfig().getWriteQuorumSize(),
+
pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum());
+ assertEquals(managedLedger.getConfig().getAckQuorumSize(),
+ pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum());
+ assertEquals(managedLedger.getConfig().getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+ assertEquals(cursor.getThrottleMarkDelete(),
+
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit(), 0.0001);
+ });
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
index 3fbc91e7d2e..70ff2ab8a4e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
@@ -33,7 +33,7 @@ public class PersistencePolicies {
private String managedLedgerStorageClassName;
public PersistencePolicies() {
- this(2, 2, 2, 0.0, null);
+ this(2, 2, 2, -1, null);
}
public PersistencePolicies(int bookkeeperEnsemble, int
bookkeeperWriteQuorum, int bookkeeperAckQuorum,
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 8adedcd14ac..51712ece5b7 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1376,8 +1376,9 @@ public class CmdNamespaces extends CmdBase {
@Option(names = { "-r",
"--ml-mark-delete-max-rate" },
- description = "Throttling rate of mark-delete operation (0
means no throttle)")
- private double managedLedgerMaxMarkDeleteRate = 0;
+ description = "Throttling rate of mark-delete operation "
+ + "(0 means no throttle, -1 means unset which will use
the default configuration from broker)")
+ private double managedLedgerMaxMarkDeleteRate = -1;
@Option(names = { "-c",
"--ml-storage-class" },
@@ -1391,9 +1392,6 @@ public class CmdNamespaces extends CmdBase {
throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ "and [--bookkeeper-ack-quorum] must greater than
0.");
}
- if (managedLedgerMaxMarkDeleteRate < 0) {
- throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
- }
getAdmin().namespaces().setPersistence(namespace, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate,
managedLedgerStorageClassName));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 9a5714cb58c..7b7883af9ac 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1188,9 +1188,12 @@ public class CmdTopicPolicies extends CmdBase {
description = "Number of acks (guaranteed copies) to wait for
each entry")
private int bookkeeperAckQuorum = 2;
- @Option(names = { "-r", "--ml-mark-delete-max-rate" },
- description = "Throttling rate of mark-delete operation (0
means no throttle)")
- private double managedLedgerMaxMarkDeleteRate = 0;
+ @Option(names = { "-r",
+ "--ml-mark-delete-max-rate" },
+ description = "Throttling rate of mark-delete operation "
+ + "(0 means no throttle, -1 means unset which will use
"
+ + "the configuration from namespace or broker)")
+ private double managedLedgerMaxMarkDeleteRate = -1;
@Option(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ "If set to true, the policy will be replicate to other
clusters asynchronously", arity = "0")
@@ -1208,9 +1211,6 @@ public class CmdTopicPolicies extends CmdBase {
throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ "and [--bookkeeper-ack-quorum] must greater than
0.");
}
- if (managedLedgerMaxMarkDeleteRate < 0) {
- throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
- }
getTopicPolicies(isGlobal).setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate,
managedLedgerStorageClassName));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e123db66fa7..e29a4dcd7a8 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -2153,8 +2153,8 @@ public class CmdTopics extends CmdBase {
@Option(names = { "-r",
"--ml-mark-delete-max-rate" }, description = "Throttling rate
of mark-delete operation "
- + "(0 means no throttle)")
- private double managedLedgerMaxMarkDeleteRate = 0;
+ + "(0 means no throttle, -1 means unset which will use the
configuration from namespace or broker)")
+ private double managedLedgerMaxMarkDeleteRate = -1;
@Option(names = { "-c",
"--ml-storage-class" },
@@ -2168,9 +2168,6 @@ public class CmdTopics extends CmdBase {
throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ "and [--bookkeeper-ack-quorum] must greater than
0.");
}
- if (managedLedgerMaxMarkDeleteRate < 0) {
- throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
- }
getTopics().setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate,
managedLedgerStorageClassName));
@@ -2509,7 +2506,7 @@ public class CmdTopics extends CmdBase {
@Option(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type"
- + " (default 1 second will be overwrite if not passed)")
+ + "(default 1 second will be overwrite if not passed)")
private int dispatchRatePeriodSec = 1;
@Option(names = { "--relative-to-publish-rate",
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
index 59d3be39019..6a0b6f6142d 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/CmdTopicPoliciesTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.testng.annotations.Test;
public class CmdTopicPoliciesTest {
@@ -44,4 +45,100 @@ public class CmdTopicPoliciesTest {
verify(topicPolicies,
times(1)).setRetention("persistent://public/default/topic",
new RetentionPolicies(200 * 24 * 60, 2 * 1024 * 1024));
}
+
+ @Test
+ public void testSetPersistenceWithDefaultMarkDeleteRate() throws Exception
{
+ TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+ CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+ // Test that the default value is now -1 (unset) instead of 0
+ cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2
-a 2".split("\\s+"));
+
+ verify(topicPolicies,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -1.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithNegativeMarkDeleteRate() throws
Exception {
+ TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+ CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+ // Test that negative values are now allowed (previously would throw
exception)
+ cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2
-a 2 -r -5.0".split("\\s+"));
+
+ verify(topicPolicies,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -5.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithZeroMarkDeleteRate() throws Exception {
+ TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+ CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+ // Test that zero is still allowed
+ cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2
-a 2 -r 0".split("\\s+"));
+
+ verify(topicPolicies,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, 0.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithPositiveMarkDeleteRate() throws
Exception {
+ TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+ CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+ // Test that positive values still work
+ cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2
-a 2 -r 10.5".split("\\s+"));
+
+ verify(topicPolicies,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, 10.5, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithUnsetMarkDeleteRate() throws Exception {
+ TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.topicPolicies(anyBoolean())).thenReturn(topicPolicies);
+
+ CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+ // Test explicitly setting to -1 (unset)
+ cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2
-a 2 -r -1".split("\\s+"));
+
+ verify(topicPolicies,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -1.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithGlobalFlag() throws Exception {
+ TopicPolicies topicPolicies = mock(TopicPolicies.class);
+
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(admin.topicPolicies(true)).thenReturn(topicPolicies);
+
+ CmdTopicPolicies cmd = new CmdTopicPolicies(() -> admin);
+
+ // Test with global flag
+ cmd.run("set-persistence persistent://public/default/topic -e 2 -w 2
-a 2 -r -1 -g".split("\\s+"));
+
+ verify(topicPolicies,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -1.0, null));
+ }
}
\ No newline at end of file
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
index bd926edc5a8..918051c0a49 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -267,4 +268,44 @@ public class TestCmdTopics {
verify(mockTopics,
times(1)).setRetention("persistent://public/default/topic",
new RetentionPolicies(200 * 24 * 60, 2 * 1024 * 1024));
}
+
+ @Test
+ public void testSetPersistenceWithDefaultMarkDeleteRate() throws Exception
{
+ // Test that the default value is now -1 (unset) instead of 0
+ cmdTopics.run("set-persistence persistent://public/default/topic -e 2
-w 2 -a 2".split("\\s+"));
+ verify(mockTopics,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -1.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithNegativeMarkDeleteRate() throws
Exception {
+ // Test that negative values are now allowed (previously would throw
exception)
+ cmdTopics.run("set-persistence persistent://public/default/topic -e 2
-w 2 -a 2 -r -5.0".split("\\s+"));
+ verify(mockTopics,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -5.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithZeroMarkDeleteRate() throws Exception {
+ // Test that zero is still allowed
+ cmdTopics.run("set-persistence persistent://public/default/topic -e 2
-w 2 -a 2 -r 0".split("\\s+"));
+ verify(mockTopics,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, 0.0, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithPositiveMarkDeleteRate() throws
Exception {
+ // Test that positive values still work
+ cmdTopics.run("set-persistence persistent://public/default/topic -e 2
-w 2 -a 2 -r 10.5".split("\\s+"));
+ verify(mockTopics,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, 10.5, null));
+ }
+
+ @Test
+ public void testSetPersistenceWithUnsetMarkDeleteRate() throws Exception {
+ // Test explicitly setting to -1 (unset)
+ cmdTopics.run("set-persistence persistent://public/default/topic -e 2
-w 2 -a 2 -r -1".split("\\s+"));
+ verify(mockTopics,
times(1)).setPersistence("persistent://public/default/topic",
+ new PersistencePolicies(2, 2, 2, -1.0, null));
+ }
}