This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 462c7176a68 [fix][admin] Set local policies overwrites "number of
bundles" passed during namespace creation (#24762)
462c7176a68 is described below
commit 462c7176a68c0c14a4df75a1858891fdf30e4e5c
Author: Oneby <[email protected]>
AuthorDate: Tue Nov 4 21:46:48 2025 +0800
[fix][admin] Set local policies overwrites "number of bundles" passed
during namespace creation (#24762)
Co-authored-by: oneby-wang <[email protected]>
(cherry picked from commit 3983ff012dbf97a656dd99f68b63b79bdf6f1602)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 26 +++++-
.../pulsar/broker/admin/NamespacesV2Test.java | 104 +++++++++++++++++++++
.../loadbalance/ExtensibleLoadManagerTest.java | 15 ++-
3 files changed, 140 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0e8d5febb9c..c3974cbaa34 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.admin.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
-import static
org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
@@ -909,7 +908,7 @@ public abstract class NamespacesBase extends AdminResource {
policies -> new LocalPolicies(policies.bundles,
bookieAffinityGroup,
policies.namespaceAntiAffinityGroup))
- .orElseGet(() -> new
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
+ .orElseGet(() -> new
LocalPolicies(getDefaultBundleData(),
bookieAffinityGroup,
null));
log.info("[{}] Successfully updated local-policies
configuration: namespace={}, map={}", clientAppId(),
@@ -920,6 +919,8 @@ public abstract class NamespacesBase extends AdminResource {
log.warn("[{}] Failed to update local-policy configuration for
namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ } catch (RestException re) {
+ throw re;
} catch (Exception e) {
log.error("[{}] Failed to update local-policy configuration for
namespace {}", clientAppId(), namespaceName,
e);
@@ -1735,11 +1736,13 @@ public abstract class NamespacesBase extends
AdminResource {
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
antiAffinityGroup))
- .orElseGet(() -> new LocalPolicies(defaultBundle(),
+ .orElseGet(() -> new
LocalPolicies(getDefaultBundleData(),
null, antiAffinityGroup))
);
log.info("[{}] Successfully updated local-policies configuration:
namespace={}, map={}", clientAppId(),
namespaceName, antiAffinityGroup);
+ } catch (RestException re) {
+ throw re;
} catch (Exception e) {
log.error("[{}] Failed to update local-policy configuration for
namespace {}", clientAppId(), namespaceName,
e);
@@ -2785,5 +2788,22 @@ public abstract class NamespacesBase extends
AdminResource {
.thenApply(policies -> policies.allowed_clusters);
}
+ // TODO remove this sync method after async refactor
+ private BundlesData getDefaultBundleData() {
+ try {
+ return
getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
+ TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get namespace-policy configuration for
namespace {}", clientAppId(),
+ namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+
+ private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
+ return namespaceResources().getPoliciesAsync(namespaceName).thenApply(
+ optionalPolicies -> optionalPolicies.isPresent() ?
optionalPolicies.get().bundles :
+
getBundles(config().getDefaultNumberOfNamespaceBundles()));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
index cec30762194..0e4ede7ae30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -38,8 +39,11 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
+import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -196,4 +200,104 @@ public class NamespacesV2Test extends
MockedPulsarServiceBaseTest {
this.testTenant, this.testNamespace));
assertTrue(Objects.isNull(dispatchRate));
}
+
+ @Test
+ public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception
{
+ // 1. create namespace with empty policies
+ String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
+ asyncRequests(response -> namespaces.createNamespace(response,
testTenant, setBookieAffinityGroupNs, null));
+
+ // 2.set bookie affinity group
+ String primaryAffinityGroup = "primary-affinity-group";
+ String secondaryAffinityGroup = "secondary-affinity-group";
+ BookieAffinityGroupData bookieAffinityGroupDataReq =
+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
+ namespaces.setBookieAffinityGroup(testTenant,
setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
+
+ // 3.query namespace num bundles, should be
conf.getDefaultNumberOfNamespaceBundles()
+ BundlesData bundlesData = (BundlesData) asyncRequests(
+ response -> namespaces.getBundlesData(response, testTenant,
setBookieAffinityGroupNs));
+ assertEquals(bundlesData.getNumBundles(),
conf.getDefaultNumberOfNamespaceBundles());
+
+ // 4.assert namespace bookie affinity group
+ BookieAffinityGroupData bookieAffinityGroupDataResp =
+ namespaces.getBookieAffinityGroup(testTenant,
setBookieAffinityGroupNs);
+ assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
+ }
+
+ @Test
+ public void testSetBookieAffinityGroupWithExistBundlePolicies() throws
Exception {
+ // 1. create namespace with specified num bundles
+ String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
+ Policies policies = new Policies();
+ policies.bundles = getBundles(10);
+ asyncRequests(response -> namespaces.createNamespace(response,
testTenant, setBookieAffinityGroupNs, policies));
+
+ // 2.set bookie affinity group
+ String primaryAffinityGroup = "primary-affinity-group";
+ String secondaryAffinityGroup = "secondary-affinity-group";
+ BookieAffinityGroupData bookieAffinityGroupDataReq =
+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
+ namespaces.setBookieAffinityGroup(testTenant,
setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
+
+ // 3.query namespace num bundles, should be policies.bundles, which we
set before
+ BundlesData bundlesData = (BundlesData) asyncRequests(
+ response -> namespaces.getBundlesData(response, testTenant,
setBookieAffinityGroupNs));
+ assertEquals(bundlesData, policies.bundles);
+
+ // 4.assert namespace bookie affinity group
+ BookieAffinityGroupData bookieAffinityGroupDataResp =
+ namespaces.getBookieAffinityGroup(testTenant,
setBookieAffinityGroupNs);
+ assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
+ }
+
+ @Test
+ public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws
Exception {
+ // 1. create namespace with empty policies
+ String setNamespaceAntiAffinityGroupNs =
"test-set-namespace-anti-affinity-group-ns";
+ asyncRequests(
+ response -> namespaces.createNamespace(response, testTenant,
setNamespaceAntiAffinityGroupNs, null));
+
+ // 2.set namespace anti affinity group
+ String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+ namespaces.setNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs,
+ namespaceAntiAffinityGroupReq);
+
+ // 3.query namespace num bundles, should be
conf.getDefaultNumberOfNamespaceBundles()
+ BundlesData bundlesData = (BundlesData) asyncRequests(
+ response -> namespaces.getBundlesData(response, testTenant,
setNamespaceAntiAffinityGroupNs));
+ assertEquals(bundlesData.getNumBundles(),
conf.getDefaultNumberOfNamespaceBundles());
+
+ // 4.assert namespace anti affinity group
+ String namespaceAntiAffinityGroupResp =
+ namespaces.getNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs);
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
+ }
+
+ @Test
+ public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies()
throws Exception {
+ // 1. create namespace with specified num bundles
+ String setNamespaceAntiAffinityGroupNs =
"test-set-namespace-anti-affinity-group-ns";
+ Policies policies = new Policies();
+ policies.bundles = getBundles(10);
+ asyncRequests(response -> namespaces.createNamespace(response,
testTenant, setNamespaceAntiAffinityGroupNs,
+ policies));
+
+ // 2.set namespace anti affinity group
+ String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
+ namespaces.setNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs,
+ namespaceAntiAffinityGroupReq);
+
+ // 3.query namespace num bundles, should be policies.bundles, which we
set before
+ BundlesData bundlesData = (BundlesData) asyncRequests(
+ response -> namespaces.getBundlesData(response, testTenant,
setNamespaceAntiAffinityGroupNs));
+ assertEquals(bundlesData, policies.bundles);
+
+ // 4.assert namespace anti affinity group
+ String namespaceAntiAffinityGroupResp =
+ namespaces.getNamespaceAntiAffinityGroup(testTenant,
setNamespaceAntiAffinityGroupNs);
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroupReq);
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 6da4c739126..13ad557c197 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -288,7 +288,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
@Test(timeOut = 40 * 1000)
- public void testAntiaffinityPolicy() throws PulsarAdminException {
+ public void testAntiAffinityPolicy() throws PulsarAdminException {
final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
final String antiAffinityEnabledNameSpace = DEFAULT_TENANT +
"/my-ns-filter" + nsSuffix;
final int numPartition = 20;
@@ -297,14 +297,25 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
assertEquals(activeBrokers.size(), NUM_BROKERS);
+ Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>();
for (int i = 0; i < activeBrokers.size(); i++) {
String namespace = antiAffinityEnabledNameSpace + "-" + i;
- admin.namespaces().createNamespace(namespace, 10);
+ antiAffinityEnabledNameSpacesReq.add(namespace);
+ admin.namespaces().createNamespace(namespace, 1);
admin.namespaces().setNamespaceAntiAffinityGroup(namespace,
namespaceAntiAffinityGroup);
admin.clusters().createFailureDomain(clusterName,
namespaceAntiAffinityGroup, FailureDomain.builder()
.brokers(Set.of(activeBrokers.get(i))).build());
+ String namespaceAntiAffinityGroupResp =
admin.namespaces().getNamespaceAntiAffinityGroup(namespace);
+ assertEquals(namespaceAntiAffinityGroupResp,
namespaceAntiAffinityGroup);
+ FailureDomain failureDomainResp =
+ admin.clusters().getFailureDomain(clusterName,
namespaceAntiAffinityGroup);
+ assertEquals(failureDomainResp.getBrokers(),
Set.of(activeBrokers.get(i)));
}
+ List<String> antiAffinityNamespacesResp =
+ admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT,
clusterName, namespaceAntiAffinityGroup);
+ assertEquals(new HashSet<>(antiAffinityNamespacesResp),
antiAffinityEnabledNameSpacesReq);
+
Set<String> result = new HashSet<>();
for (int i = 0; i < activeBrokers.size(); i++) {
final String topic = "persistent://" +
antiAffinityEnabledNameSpace + "-" + i +"/topic";