This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 22eeddd8ddc [fix][broker]Fix dirty reading of namespace level offload thresholds (#24696) 22eeddd8ddc is described below commit 22eeddd8ddc7aae85b069328d7a7344866ac3c09 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu Sep 4 16:45:22 2025 +0800 [fix][broker]Fix dirty reading of namespace level offload thresholds (#24696) (cherry picked from commit 96bc370e6b9a10e1656983d522e272a36a44fdae) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 68 +++++++++++++++++----- .../apache/pulsar/broker/admin/AdminApi2Test.java | 51 ++++++++++++++++ 2 files changed, 104 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d393a71da72..0e821c83e61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2182,7 +2182,7 @@ public abstract class NamespacesBase extends AdminResource { policies.offload_policies = new OffloadPoliciesImpl(); } ((OffloadPoliciesImpl) policies.offload_policies).setManagedLedgerOffloadThresholdInBytes(newThreshold); - policies.offload_threshold = newThreshold; + mergeOffloadThresholdsForCompatibility(policies); return policies; }); log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}", @@ -2209,7 +2209,7 @@ public abstract class NamespacesBase extends AdminResource { } ((OffloadPoliciesImpl) policies.offload_policies) .setManagedLedgerOffloadThresholdInSeconds(newThreshold); - policies.offload_threshold_in_seconds = newThreshold; + mergeOffloadThresholdsForCompatibility(policies); return policies; }) ) @@ -2239,7 +2239,7 @@ public abstract class NamespacesBase extends AdminResource { } ((OffloadPoliciesImpl) policies.offload_policies) .setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs); - policies.offload_deletion_lag_ms = newDeletionLagMs; + mergeOffloadThresholdsForCompatibility(policies); return policies; }); log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}", @@ -2352,6 +2352,50 @@ public abstract class NamespacesBase extends AdminResource { } } + /** + * Before https://github.com/apache/pulsar/pull/6183, users can set broker level offload policies, and handle + * namespace-level thresholds by the following fields: + * - {@link Policies#offload_deletion_lag_ms} + * - {@link Policies#offload_threshold} + * - {@link Policies#offload_threshold_in_seconds} + * + * After https://github.com/apache/pulsar/pull/6183, Pulsar supports namespace-level policies, which was + * supported by {@link Policies#offload_policies}. And the thresholds were moved to the following fields: + * - {@link Policies#offload_policies} -> {@link OffloadPoliciesImpl#getManagedLedgerOffloadDeletionLagInMillis} + * - {@link Policies#offload_policies} -> {@link OffloadPoliciesImpl#getManagedLedgerOffloadThresholdInBytes} + * - {@link Policies#offload_policies} -> {@link OffloadPoliciesImpl#getManagedLedgerOffloadThresholdInSeconds} + * + * To make the offload policies compatible with the old policies, uses the old policies if the new policies + * are not set. Once the new fields are set, the old fields will be ignored. + */ + private void mergeOffloadThresholdsForCompatibility(Policies nsPolicies) { + Long oldOffloadDeletionLagMs = nsPolicies.offload_deletion_lag_ms; + Long oldOffloadThresholdInBytes = nsPolicies.offload_threshold; + Long odlOffloadThresholdInSeconds = nsPolicies.offload_threshold_in_seconds; + // If the old values are empty, skip. + if (oldOffloadDeletionLagMs == null && oldOffloadThresholdInBytes == -1 && odlOffloadThresholdInSeconds == -1) { + return; + } + // If the new values are empty, use the old values. + OffloadPoliciesImpl nsOffloadPolicies = (OffloadPoliciesImpl) nsPolicies.offload_policies; + if (Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), + OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS) && oldOffloadDeletionLagMs != null) { + nsOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(oldOffloadDeletionLagMs); + } + if (Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadThresholdInBytes(), + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) && oldOffloadThresholdInBytes != -1) { + nsOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(oldOffloadThresholdInBytes); + } + if (Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS) && odlOffloadThresholdInSeconds != -1) { + nsOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(odlOffloadThresholdInSeconds); + } + // Since the thresholds are moved into "nsPolicies.offload_policies", remove the old fields. + nsPolicies.offload_deletion_lag_ms = null; + nsPolicies.offload_threshold = -1; + nsPolicies.offload_threshold_in_seconds = -1; + } + protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPoliciesImpl offloadPolicies) { validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); @@ -2359,19 +2403,8 @@ public abstract class NamespacesBase extends AdminResource { try { namespaceResources().setPoliciesAsync(namespaceName, policies -> { - if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), - OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) { - offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms); - } else { - policies.offload_deletion_lag_ms = offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(); - } - if (Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), - OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) { - offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold); - } else { - policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes(); - } policies.offload_policies = offloadPolicies; + mergeOffloadThresholdsForCompatibility(policies); return policies; }).thenApply(r -> { log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(), @@ -2397,7 +2430,12 @@ public abstract class NamespacesBase extends AdminResource { validatePoliciesReadOnlyAccess(); try { namespaceResources().setPoliciesAsync(namespaceName, (policies) -> { + // Remove new offload policies. policies.offload_policies = null; + // Remove the old offload policies thresholds. + policies.offload_deletion_lag_ms = null; + policies.offload_threshold = -1; + policies.offload_threshold_in_seconds = -1; return policies; }).thenApply(r -> { log.info("[{}] Successfully remove offload configuration: namespace={}", clientAppId(), namespaceName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 38a8a374e2f..a4b3ec2aa80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -125,11 +125,14 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -3816,6 +3819,54 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { Assert.assertTrue(permissions22.isEmpty()); } + @Test + public void testOverridesNamespaceOffloadThreshold() throws Exception { + String namespace = BrokerTestUtil.newUniqueName(this.defaultTenant + "/ns"); + String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + admin.topicPolicies().setDispatchRate(topic, DispatchRate.builder().dispatchThrottlingRateInMsg(1).build()); + // assert we get -1 which indicates it will fall back to default + assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1); + // Set namespace level offloading threshold. + long m1 = 1024 * 1024; + long h1 = 1000 * 3600; + OffloadPoliciesImpl policies = OffloadPoliciesImpl.builder() + .managedLedgerOffloadDriver("S3") + .s3ManagedLedgerOffloadBucket("bucket-1") + .managedLedgerOffloadThresholdInBytes(m1) + .managedLedgerOffloadThresholdInSeconds(h1) + .build(); + admin.namespaces().setOffloadPolicies(namespace, policies); + OffloadPolicies policies1 = admin.namespaces().getOffloadPolicies(namespace); + assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), m1); + assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(), h1); + + long m2 = 2 * 1024 * 1024L; + long h2 = 2 * 1000 * 3600; + admin.namespaces().setOffloadThreshold(namespace, m2); + admin.namespaces().setOffloadThresholdInSeconds(namespace, h2); + OffloadPolicies policies2 = admin.namespaces().getOffloadPolicies(namespace); + assertEquals(policies2.getManagedLedgerOffloadThresholdInBytes(), m2); + assertEquals(policies2.getManagedLedgerOffloadThresholdInSeconds(), h2); + OffloadPolicies policies3 = admin.topicPolicies().getOffloadPolicies(topic, true); + assertEquals(policies3.getManagedLedgerOffloadThresholdInBytes(), m2); + assertEquals(policies3.getManagedLedgerOffloadThresholdInSeconds(), h2); + + admin.namespaces().removeOffloadPolicies(namespace); + OffloadPolicies policies4 = admin.namespaces().getOffloadPolicies(namespace); + assertTrue(policies4 == null); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1); + OffloadPolicies policies5 = admin.topicPolicies().getOffloadPolicies(topic, true); + assertTrue(policies5 == null); + + // cleanup. + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } + @Test public void testDeletePatchyPartitionedTopic() throws Exception { final String topic = BrokerTestUtil.newUniqueName(defaultNamespace + "/tp");