This is an automated email from the ASF dual-hosted git repository.
coderzc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d22970d58e7 [fix][broker] Use effective offload policies for extra
configs (#25781)
d22970d58e7 is described below
commit d22970d58e793641ce55373d6eb6ca6225edd006
Author: Cong Zhao <[email protected]>
AuthorDate: Mon May 18 16:37:37 2026 +0800
[fix][broker] Use effective offload policies for extra configs (#25781)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 6 ++-
.../pulsar/broker/admin/AdminApiOffloadTest.java | 46 ++++++++++++++++++++++
.../common/policies/data/OffloadPoliciesImpl.java | 27 ++++++++++++-
.../common/policies/data/OffloadPoliciesTest.java | 22 ++++++++++-
4 files changed, 98 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4820a61eebe..1d0ad7ab600 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -3167,8 +3167,12 @@ public abstract class NamespacesBase extends
AdminResource {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
+ OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl)
policies.offload_policies;
+ OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(null,
+
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies),
+ pulsar().getConfig().getProperties());
LedgerOffloader managedLedgerOffloader = pulsar()
- .getManagedLedgerOffloader(namespaceName,
(OffloadPoliciesImpl) policies.offload_policies);
+ .getManagedLedgerOffloader(namespaceName, offloadPolicies);
String localClusterName = pulsar().getConfiguration().getClusterName();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 4b4eb2ff4fe..8966f4803da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -37,8 +37,12 @@
*/
package org.apache.pulsar.broker.admin;
+import static
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -56,6 +60,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +85,7 @@ import
org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
+import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -118,6 +126,44 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @Test
+ public void testScanOffloadedLedgersUsesMergedNamespaceOffloadPolicies()
throws Exception {
+
pulsar.getConfig().getProperties().setProperty("managedLedgerOffloadDriver",
"aws-s3");
+
pulsar.getConfig().getProperties().setProperty("managedLedgerOffloadBucket",
"broker-bucket");
+
pulsar.getConfig().getProperties().setProperty("managedLedgerOffloadRegion",
"us-east-1");
+ pulsar.getConfig().getProperties().setProperty(
+ EXTRA_CONFIG_PREFIX + "tieredStorageBucketPrefix",
"broker-prefix");
+
+ admin.namespaces().setOffloadThreshold(myNamespace, 1024L);
+
+ LedgerOffloader offloader = mock(LedgerOffloader.class);
+ doReturn(Map.of()).when(offloader).getOffloadDriverMetadata();
+ doNothing().when(offloader).scanLedgers(any(), anyMap());
+ ArgumentCaptor<OffloadPoliciesImpl> offloadPoliciesCaptor =
ArgumentCaptor.forClass(OffloadPoliciesImpl.class);
+
doReturn(offloader).when(pulsar).getManagedLedgerOffloader(eq(NamespaceName.get(myNamespace)),
+ offloadPoliciesCaptor.capture());
+
+ Client client = ClientBuilder.newClient();
+ try {
+ try (Response response =
client.target(pulsar.getWebServiceAddress()
+ + "/admin/v2/namespaces/" + myNamespace +
"/scanOffloadedLedgers").request().get()) {
+ assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
+ response.readEntity(String.class);
+ }
+ } finally {
+ client.close();
+ }
+
+ OffloadPoliciesImpl offloadPolicies = offloadPoliciesCaptor.getValue();
+ assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(),
"aws-s3");
+ assertEquals(offloadPolicies.getManagedLedgerOffloadBucket(),
"broker-bucket");
+ assertEquals(offloadPolicies.getManagedLedgerOffloadRegion(),
"us-east-1");
+
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
Long.valueOf(1024L));
+ assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+ Map.of("tieredStorageBucketPrefix", "broker-prefix"));
+ verify(offloader).scanLedgers(any(), anyMap());
+ }
+
private void testOffload(String topicName, String mlName) throws Exception
{
LedgerOffloader offloader = mock(LedgerOffloader.class);
when(offloader.getOffloadDriverName()).thenReturn("mock");
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index ce1c369897f..14f89ec156e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -429,7 +429,10 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
for (Field field : CONFIGURATION_FIELDS) {
Object object;
- if (topicLevelPolicies != null &&
field.get(topicLevelPolicies) != null) {
+ if
(field.getName().equals("managedLedgerExtraConfigurations")) {
+ object =
mergeManagedLedgerExtraConfigurations(topicLevelPolicies, nsLevelPolicies,
+ brokerProperties);
+ } else if (topicLevelPolicies != null &&
field.get(topicLevelPolicies) != null) {
object = field.get(topicLevelPolicies);
} else if (nsLevelPolicies != null &&
field.get(nsLevelPolicies) != null) {
object = field.get(nsLevelPolicies);
@@ -454,6 +457,28 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
}
}
+ private static Map<String, String>
mergeManagedLedgerExtraConfigurations(OffloadPoliciesImpl topicLevelPolicies,
+
OffloadPoliciesImpl nsLevelPolicies,
+
Properties brokerProperties) {
+ Map<String, String> mergedExtraConfigurations = new HashMap<>();
+ putAllExtraConfigurations(mergedExtraConfigurations,
getExtraConfigurations(brokerProperties));
+ if (nsLevelPolicies != null) {
+ putAllExtraConfigurations(mergedExtraConfigurations,
+ nsLevelPolicies.getManagedLedgerExtraConfigurations());
+ }
+ if (topicLevelPolicies != null) {
+ putAllExtraConfigurations(mergedExtraConfigurations,
+ topicLevelPolicies.getManagedLedgerExtraConfigurations());
+ }
+ return mergedExtraConfigurations.isEmpty() ? null :
mergedExtraConfigurations;
+ }
+
+ private static void putAllExtraConfigurations(Map<String, String> target,
Map<String, String> extraConfigurations) {
+ if (extraConfigurations != null && !extraConfigurations.isEmpty()) {
+ target.putAll(extraConfigurations);
+ }
+ }
+
/**
* Make configurations of the OffloadPolicies compatible with the config
file.
*
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index a8216e8f8a9..6755cbec9a9 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -353,16 +353,36 @@ public class OffloadPoliciesTest {
Properties brokerProperties = new Properties();
brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
brokerProperties.setProperty(EXTRA_CONFIG_PREFIX +
"tieredStorageBucketPrefix", "broker-prefix");
+ brokerProperties.setProperty(EXTRA_CONFIG_PREFIX + "brokerOnly",
"broker-value");
OffloadPoliciesImpl topicLevelPolicies = new OffloadPoliciesImpl();
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("tieredStorageBucketPrefix",
"topic-prefix");
+
topicLevelPolicies.getManagedLedgerExtraConfigurations().put("topicOnly",
"topic-value");
OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(topicLevelPolicies,
null, brokerProperties);
Assert.assertNotNull(offloadPolicies);
assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
- Map.of("tieredStorageBucketPrefix", "topic-prefix"));
+ Map.of("tieredStorageBucketPrefix", "topic-prefix",
+ "brokerOnly", "broker-value",
+ "topicOnly", "topic-value"));
+ }
+
+ @Test
+ public void
emptyHigherLevelExtraConfigInheritsBrokerExtraConfigMergeTest() {
+ Properties brokerProperties = new Properties();
+ brokerProperties.setProperty("managedLedgerOffloadDriver", "aws-s3");
+ brokerProperties.setProperty(EXTRA_CONFIG_PREFIX +
"tieredStorageBucketPrefix", "broker-prefix");
+
+ OffloadPoliciesImpl nsLevelPolicies = new OffloadPoliciesImpl();
+
+ OffloadPoliciesImpl offloadPolicies =
+ OffloadPoliciesImpl.mergeConfiguration(null, nsLevelPolicies,
brokerProperties);
+
+ Assert.assertNotNull(offloadPolicies);
+ assertEquals(offloadPolicies.getManagedLedgerExtraConfigurations(),
+ Map.of("tieredStorageBucketPrefix", "broker-prefix"));
}
@Test