This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 22eeddd8ddc [fix][broker]Fix dirty reading of namespace level offload 
thresholds (#24696)
22eeddd8ddc is described below

commit 22eeddd8ddc7aae85b069328d7a7344866ac3c09
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu Sep 4 16:45:22 2025 +0800

    [fix][broker]Fix dirty reading of namespace level offload thresholds 
(#24696)
    
    (cherry picked from commit 96bc370e6b9a10e1656983d522e272a36a44fdae)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 68 +++++++++++++++++-----
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 51 ++++++++++++++++
 2 files changed, 104 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d393a71da72..0e821c83e61 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2182,7 +2182,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                     policies.offload_policies = new OffloadPoliciesImpl();
                 }
                 ((OffloadPoliciesImpl) 
policies.offload_policies).setManagedLedgerOffloadThresholdInBytes(newThreshold);
-                policies.offload_threshold = newThreshold;
+                mergeOffloadThresholdsForCompatibility(policies);
                 return policies;
             });
             log.info("[{}] Successfully updated offloadThreshold 
configuration: namespace={}, value={}",
@@ -2209,7 +2209,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                             }
                             ((OffloadPoliciesImpl) policies.offload_policies)
                                     
.setManagedLedgerOffloadThresholdInSeconds(newThreshold);
-                            policies.offload_threshold_in_seconds = 
newThreshold;
+                            mergeOffloadThresholdsForCompatibility(policies);
                             return policies;
                         })
                 )
@@ -2239,7 +2239,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
                 ((OffloadPoliciesImpl) policies.offload_policies)
                         
.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
-                policies.offload_deletion_lag_ms = newDeletionLagMs;
+                mergeOffloadThresholdsForCompatibility(policies);
                 return policies;
             });
             log.info("[{}] Successfully updated offloadDeletionLagMs 
configuration: namespace={}, value={}",
@@ -2352,6 +2352,50 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
+    /**
+     * Before https://github.com/apache/pulsar/pull/6183, users can set broker 
level offload policies, and handle
+     * namespace-level thresholds by the following fields:
+     *   - {@link Policies#offload_deletion_lag_ms}
+     *   - {@link Policies#offload_threshold}
+     *   - {@link Policies#offload_threshold_in_seconds}
+     *
+     * After https://github.com/apache/pulsar/pull/6183, Pulsar supports 
namespace-level policies, which was
+     * supported by {@link Policies#offload_policies}. And the thresholds were 
moved to the following fields:
+     * - {@link Policies#offload_policies} -> {@link 
OffloadPoliciesImpl#getManagedLedgerOffloadDeletionLagInMillis}
+     * - {@link Policies#offload_policies} -> {@link 
OffloadPoliciesImpl#getManagedLedgerOffloadThresholdInBytes}
+     * - {@link Policies#offload_policies} -> {@link 
OffloadPoliciesImpl#getManagedLedgerOffloadThresholdInSeconds}
+     *
+     * To make the offload policies compatible with the old policies, uses the 
old policies if the new policies
+     * are not set. Once the new fields are set, the old fields will be 
ignored.
+     */
+    private void mergeOffloadThresholdsForCompatibility(Policies nsPolicies) {
+        Long oldOffloadDeletionLagMs = nsPolicies.offload_deletion_lag_ms;
+        Long oldOffloadThresholdInBytes = nsPolicies.offload_threshold;
+        Long odlOffloadThresholdInSeconds = 
nsPolicies.offload_threshold_in_seconds;
+        // If the old values are empty, skip.
+        if (oldOffloadDeletionLagMs == null && oldOffloadThresholdInBytes == 
-1 && odlOffloadThresholdInSeconds == -1) {
+            return;
+        }
+        // If the new values are empty, use the old values.
+        OffloadPoliciesImpl nsOffloadPolicies = (OffloadPoliciesImpl) 
nsPolicies.offload_policies;
+        if 
(Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
+                OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS) && 
oldOffloadDeletionLagMs != null) {
+            
nsOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(oldOffloadDeletionLagMs);
+        }
+        if 
(Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
+                OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) && 
oldOffloadThresholdInBytes != -1) {
+            
nsOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(oldOffloadThresholdInBytes);
+        }
+        if 
(Objects.equals(nsOffloadPolicies.getManagedLedgerOffloadThresholdInSeconds(),
+                OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS) && 
odlOffloadThresholdInSeconds != -1) {
+            
nsOffloadPolicies.setManagedLedgerOffloadThresholdInSeconds(odlOffloadThresholdInSeconds);
+        }
+        // Since the thresholds are moved into "nsPolicies.offload_policies", 
remove the old fields.
+        nsPolicies.offload_deletion_lag_ms = null;
+        nsPolicies.offload_threshold = -1;
+        nsPolicies.offload_threshold_in_seconds = -1;
+    }
+
     protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, 
OffloadPoliciesImpl offloadPolicies) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
@@ -2359,19 +2403,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
         try {
             namespaceResources().setPoliciesAsync(namespaceName, policies -> {
-                if 
(Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
-                        
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
-                    
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
-                } else {
-                    policies.offload_deletion_lag_ms = 
offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis();
-                }
-                if 
(Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
-                        
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) {
-                    
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
-                } else {
-                    policies.offload_threshold = 
offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
-                }
                 policies.offload_policies = offloadPolicies;
+                mergeOffloadThresholdsForCompatibility(policies);
                 return policies;
             }).thenApply(r -> {
                 log.info("[{}] Successfully updated offload configuration: 
namespace={}, map={}", clientAppId(),
@@ -2397,7 +2430,12 @@ public abstract class NamespacesBase extends 
AdminResource {
         validatePoliciesReadOnlyAccess();
         try {
             namespaceResources().setPoliciesAsync(namespaceName, (policies) -> 
{
+                // Remove new offload policies.
                 policies.offload_policies = null;
+                // Remove the old offload policies thresholds.
+                policies.offload_deletion_lag_ms = null;
+                policies.offload_threshold = -1;
+                policies.offload_threshold_in_seconds = -1;
                 return policies;
             }).thenApply(r -> {
                 log.info("[{}] Successfully remove offload configuration: 
namespace={}", clientAppId(), namespaceName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 38a8a374e2f..a4b3ec2aa80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -125,11 +125,14 @@ import 
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -3816,6 +3819,54 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         Assert.assertTrue(permissions22.isEmpty());
     }
 
+    @Test
+    public void testOverridesNamespaceOffloadThreshold() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName(this.defaultTenant + 
"/ns");
+        String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setDispatchRate(topic, 
DispatchRate.builder().dispatchThrottlingRateInMsg(1).build());
+        // assert we get -1 which indicates it will fall back to default
+        assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1);
+        
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1);
+        // Set namespace level offloading threshold.
+        long m1 = 1024 * 1024;
+        long h1 = 1000 * 3600;
+        OffloadPoliciesImpl policies = OffloadPoliciesImpl.builder()
+                .managedLedgerOffloadDriver("S3")
+                .s3ManagedLedgerOffloadBucket("bucket-1")
+                .managedLedgerOffloadThresholdInBytes(m1)
+                .managedLedgerOffloadThresholdInSeconds(h1)
+                .build();
+        admin.namespaces().setOffloadPolicies(namespace, policies);
+        OffloadPolicies policies1 = 
admin.namespaces().getOffloadPolicies(namespace);
+        assertEquals(policies1.getManagedLedgerOffloadThresholdInBytes(), m1);
+        assertEquals(policies1.getManagedLedgerOffloadThresholdInSeconds(), 
h1);
+
+        long m2 = 2 * 1024 * 1024L;
+        long h2 = 2 * 1000 * 3600;
+        admin.namespaces().setOffloadThreshold(namespace, m2);
+        admin.namespaces().setOffloadThresholdInSeconds(namespace, h2);
+        OffloadPolicies policies2 = 
admin.namespaces().getOffloadPolicies(namespace);
+        assertEquals(policies2.getManagedLedgerOffloadThresholdInBytes(), m2);
+        assertEquals(policies2.getManagedLedgerOffloadThresholdInSeconds(), 
h2);
+        OffloadPolicies policies3 = 
admin.topicPolicies().getOffloadPolicies(topic, true);
+        assertEquals(policies3.getManagedLedgerOffloadThresholdInBytes(), m2);
+        assertEquals(policies3.getManagedLedgerOffloadThresholdInSeconds(), 
h2);
+
+        admin.namespaces().removeOffloadPolicies(namespace);
+        OffloadPolicies policies4 = 
admin.namespaces().getOffloadPolicies(namespace);
+        assertTrue(policies4 == null);
+        assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1);
+        
assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1);
+        OffloadPolicies policies5 = 
admin.topicPolicies().getOffloadPolicies(topic, true);
+        assertTrue(policies5 == null);
+
+        // cleanup.
+        admin.topics().delete(topic);
+        admin.namespaces().deleteNamespace(namespace);
+    }
+
     @Test
     public void testDeletePatchyPartitionedTopic() throws Exception {
         final String topic = BrokerTestUtil.newUniqueName(defaultNamespace + 
"/tp");

Reply via email to