This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0f15a3667e1566b7f0b646061cdef19a6fd3775a Author: hrzzzz <[email protected]> AuthorDate: Fri Dec 20 20:04:24 2024 +0800 [fix][broker] Fix bug causing loss of migrated information when setting other localPolicies in namespace (#23764) Co-authored-by: ruihongzhou <[email protected]> (cherry picked from commit bbe2cabc3ec0375607cb12665cab0b4745dbd36e) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 20 ++++++++++------ .../pulsar/common/naming/NamespaceBundles.java | 3 ++- .../apache/pulsar/broker/admin/NamespacesTest.java | 27 ++++++++++++++++++++++ .../pulsar/common/policies/data/LocalPolicies.java | 11 ++++++++- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d80e2487b4f..ca4c685b280 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -960,7 +960,8 @@ public abstract class NamespacesBase extends AdminResource { LocalPolicies localPolicies = oldPolicies.map( policies -> new LocalPolicies(policies.bundles, bookieAffinityGroup, - policies.namespaceAntiAffinityGroup)) + policies.namespaceAntiAffinityGroup, + policies.migrated)) .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), bookieAffinityGroup, null)); @@ -1779,7 +1780,8 @@ public abstract class NamespacesBase extends AdminResource { getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)-> lp.map(policies -> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, - antiAffinityGroup)) + antiAffinityGroup, + policies.migrated)) .orElseGet(() -> new LocalPolicies(defaultBundle(), null, antiAffinityGroup)) ); @@ -1816,7 +1818,8 @@ public abstract class NamespacesBase extends AdminResource { getLocalPolicies().setLocalPolicies(namespaceName, (policies)-> new LocalPolicies(policies.bundles, policies.bookieAffinityGroup, - null)); + null, + policies.migrated)); log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); } catch (Exception e) { log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); @@ -2765,10 +2768,13 @@ public abstract class NamespacesBase extends AdminResource { protected void internalEnableMigration(boolean migrated) { validateSuperUserAccess(); try { - getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> { - policies.migrated = migrated; - return policies; - }); + getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> oldPolicies.map( + policies -> new LocalPolicies(policies.bundles, + policies.bookieAffinityGroup, + policies.namespaceAntiAffinityGroup, + migrated)) + .orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()), + null, null, migrated))); log.info("Successfully updated migration on namespace {}", namespaceName); } catch (Exception e) { log.error("Failed to update migration on namespace {}", namespaceName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index fa7baeaa606..3ee365cdd45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -197,6 +197,7 @@ public class NamespaceBundles { public LocalPolicies toLocalPolicies() { return new LocalPolicies(this.getBundlesData(), localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null), - localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null)); + localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null), + localPolicies.map(lp -> lp.getLeft().migrated).orElse(false)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index f2948660952..18cc449d15d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -102,6 +102,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; @@ -2195,4 +2196,30 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { admin.namespaces().deleteNamespace(namespace); } + + public void testMigratedInfoIsNotLostDuringOtherLocalPoliciesUpdate() throws Exception { + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); + admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster)); + + admin.namespaces().updateMigrationState(namespace, true); + assertTrue(admin.namespaces().getPolicies(namespace).migrated); + + String bookieAffinityGroupPrimary = "group1"; + admin.namespaces().setBookieAffinityGroup(namespace, + BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(bookieAffinityGroupPrimary).build()); + assertEquals(admin.namespaces().getBookieAffinityGroup(namespace).getBookkeeperAffinityGroupPrimary(), + bookieAffinityGroupPrimary); + assertTrue(admin.namespaces().getPolicies(namespace).migrated); + + String namespaceAntiAffinityGroup = "group2"; + admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup); + assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), namespaceAntiAffinityGroup); + assertTrue(admin.namespaces().getPolicies(namespace).migrated); + + admin.namespaces().deleteBookieAffinityGroup(namespace); + assertTrue(admin.namespaces().getPolicies(namespace).migrated); + + admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace); + assertTrue(admin.namespaces().getPolicies(namespace).migrated); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java index 3b17dbe067e..43f5130eb9f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java @@ -34,20 +34,29 @@ public class LocalPolicies { public final BookieAffinityGroupData bookieAffinityGroup; // namespace anti-affinity-group public final String namespaceAntiAffinityGroup; - public boolean migrated; + public final boolean migrated; public LocalPolicies() { bundles = defaultBundle(); bookieAffinityGroup = null; namespaceAntiAffinityGroup = null; + migrated = false; } public LocalPolicies(BundlesData data, BookieAffinityGroupData bookieAffinityGroup, String namespaceAntiAffinityGroup) { + this(data, bookieAffinityGroup, namespaceAntiAffinityGroup, false); + } + + public LocalPolicies(BundlesData data, + BookieAffinityGroupData bookieAffinityGroup, + String namespaceAntiAffinityGroup, + boolean migrated) { bundles = data; this.bookieAffinityGroup = bookieAffinityGroup; this.namespaceAntiAffinityGroup = namespaceAntiAffinityGroup; + this.migrated = migrated; } } \ No newline at end of file
