This is an automated email from the ASF dual-hosted git repository.

coderzc pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 814668f5832 [fix][broker] Use effective offload policies for extra 
configs (#25781)
814668f5832 is described below

commit 814668f58325c91c1ea1de82144539e0306bce74
Author: Cong Zhao <[email protected]>
AuthorDate: Mon May 18 16:37:37 2026 +0800

    [fix][broker] Use effective offload policies for extra configs (#25781)
    
    (cherry picked from commit d22970d58e793641ce55373d6eb6ca6225edd006)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  6 ++-
 .../pulsar/broker/admin/AdminApiOffloadTest.java   | 46 ++++++++++++++++++++++
 .../common/policies/data/OffloadPoliciesImpl.java  | 27 ++++++++++++-
 .../common/policies/data/OffloadPoliciesTest.java  | 22 ++++++++++-
 4 files changed, 98 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 1e99aa0716b..7b5a4fbb005 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2929,8 +2929,12 @@ public abstract class NamespacesBase extends 
AdminResource {
         validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, 
PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
+        OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) 
policies.offload_policies;
+        OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.mergeConfiguration(null,
+                
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies),
+                pulsar().getConfig().getProperties());
         LedgerOffloader managedLedgerOffloader = pulsar()
-                .getManagedLedgerOffloader(namespaceName, 
(OffloadPoliciesImpl) policies.offload_policies);
+                .getManagedLedgerOffloader(namespaceName, offloadPolicies);
 
         String localClusterName = pulsar().getConfiguration().getClusterName();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index cadfd759c15..18c2861354b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -37,8 +37,12 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static 
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -56,6 +60,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +85,7 @@ import 
org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.awaitility.Awaitility;
+import org.mockito.ArgumentCaptor;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -118,6 +126,44 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Test
+    public void testScanOffloadedLedgersUsesMergedNamespaceOffloadPolicies() 
throws Exception {
+        
pulsar.getConfig().getProperties().setProperty("managedLedgerOffloadDriver", 
"aws-s3");
+        
pulsar.getConfig().getProperties().setProperty("managedLedgerOffloadBucket", 
"broker-bucket");
+        
pulsar.getConfig().getProperties().setProperty("managedLedgerOffloadRegion", 
"us-east-1");
+        pulsar.getConfig().getProperties().setProperty(
+                EXTRA_CONFIG_PREFIX + "tieredStorageBucketPrefix", 
"broker-prefix");
+
+        admin.namespaces().setOffloadThreshold(myNamespace, 1024L);
+
+        LedgerOffloader offloader = mock(LedgerOffloader.class);
+        doReturn(Map.of()).when(offloader).getOffloadDriverMetadata();
+        doNothing().when(offloader).scanLedgers(any(), anyMap());
+        ArgumentCaptor<OffloadPoliciesImpl> offloadPoliciesCaptor = 
ArgumentCaptor.forClass(OffloadPoliciesImpl.class);
+        
doReturn(offloader).when(pulsar).getManagedLedgerOffloader(eq(NamespaceName.get(myNamespace)),
+                offloadPoliciesCaptor.capture());
+
+        Client client = ClientBuilder.newClient();
+        try {
+            try (Response response = 
client.target(pulsar.getWebServiceAddress()
+                    + "/admin/v2/namespaces/" + myNamespace + 
"/scanOffloadedLedgers").request().get()) {
+                assertEquals(response.getStatus(), 
Response.Status.OK.getStatusCode());
+                response.readEntity(String.class);
+            }
+        } finally {
+            client.close();
+        }
+
+        OffloadPoliciesImpl offloadPolicies = offloadPoliciesCaptor.getValue();
+        assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), 
"aws-s3");
+        assertEquals(offloadPolicies.getManagedLedgerOffloadBucket(), 
"broker-bucket");
+        assertEquals(offloadPolicies.getManagedLedgerOffloadRegion(), 
"us-east-1");
+        
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), 
Long.valueOf(1024L));
+        assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+                Map.of("tieredStorageBucketPrefix", "broker-prefix"));
+        verify(offloader).scanLedgers(any(), anyMap());
+    }
+
     private void testOffload(String topicName, String mlName) throws Exception 
{
         LedgerOffloader offloader = mock(LedgerOffloader.class);
         when(offloader.getOffloadDriverName()).thenReturn("mock");
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 93f090c9ddf..c5579d1008c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -428,7 +428,10 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
             OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
             for (Field field : CONFIGURATION_FIELDS) {
                 Object object;
-                if (topicLevelPolicies != null && 
field.get(topicLevelPolicies) != null) {
+                if 
(field.getName().equals("managedLedgerExtraConfigurations")) {
+                    object = 
mergeManagedLedgerExtraConfigurations(topicLevelPolicies, nsLevelPolicies,
+                            brokerProperties);
+                } else if (topicLevelPolicies != null && 
field.get(topicLevelPolicies) != null) {
                     object = field.get(topicLevelPolicies);
                 } else if (nsLevelPolicies != null && 
field.get(nsLevelPolicies) != null) {
                     object = field.get(nsLevelPolicies);
@@ -453,6 +456,28 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
         }
     }
 
+    private static Map<String, String> 
mergeManagedLedgerExtraConfigurations(OffloadPoliciesImpl topicLevelPolicies,
+                                                                            
OffloadPoliciesImpl nsLevelPolicies,
+                                                                            
Properties brokerProperties) {
+        Map<String, String> mergedExtraConfigurations = new HashMap<>();
+        putAllExtraConfigurations(mergedExtraConfigurations, 
getExtraConfigurations(brokerProperties));
+        if (nsLevelPolicies != null) {
+            putAllExtraConfigurations(mergedExtraConfigurations,
+                    nsLevelPolicies.getManagedLedgerExtraConfigurations());
+        }
+        if (topicLevelPolicies != null) {
+            putAllExtraConfigurations(mergedExtraConfigurations,
+                    topicLevelPolicies.getManagedLedgerExtraConfigurations());
+        }
+        return mergedExtraConfigurations.isEmpty() ? null : 
mergedExtraConfigurations;
+    }
+
+    private static void putAllExtraConfigurations(Map<String, String> target, 
Map<String, String> extraConfigurations) {
+        if (extraConfigurations != null && !extraConfigurations.isEmpty()) {
+            target.putAll(extraConfigurations);
+        }
+    }
+
     /**
      * Make configurations of the OffloadPolicies compatible with the config 
file.
      *
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 0662be3f054..b3b02e46a58 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -353,16 +353,36 @@ public class OffloadPoliciesTest {
         Properties brokerProperties = new Properties();
         brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
         brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + 
"tieredStorageBucketPrefix", "broker-prefix");
+        brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + "brokerOnly", 
"broker-value");
 
         OffloadPoliciesImpl topicLevelPolicies = new OffloadPoliciesImpl();
         
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("tieredStorageBucketPrefix",
 "topic-prefix");
+        
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("topicOnly", 
"topic-value");
 
         OffloadPoliciesImpl offloadPolicies =
                 OffloadPoliciesImpl.mergeConfiguration(topicLevelPolicies, 
null, brokerProperties);
 
         Assert.assertNotNull(offloadPolicies);
         assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
-                Map.of("tieredStorageBucketPrefix", "topic-prefix"));
+                Map.of("tieredStorageBucketPrefix", "topic-prefix",
+                        "brokerOnly", "broker-value",
+                        "topicOnly", "topic-value"));
+    }
+
+    @Test
+    public void 
emptyHigherLevelExtraConfigInheritsBrokerExtraConfigMergeTest() {
+        Properties brokerProperties = new Properties();
+        brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+        brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + 
"tieredStorageBucketPrefix", "broker-prefix");
+
+        OffloadPoliciesImpl nsLevelPolicies = new OffloadPoliciesImpl();
+
+        OffloadPoliciesImpl offloadPolicies =
+                OffloadPoliciesImpl.mergeConfiguration(null, nsLevelPolicies, 
brokerProperties);
+
+        Assert.assertNotNull(offloadPolicies);
+        assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+                Map.of("tieredStorageBucketPrefix", "broker-prefix"));
     }
 
     @Test

Reply via email to