This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0f15a3667e1566b7f0b646061cdef19a6fd3775a
Author: hrzzzz <[email protected]>
AuthorDate: Fri Dec 20 20:04:24 2024 +0800

    [fix][broker] Fix bug causing loss of migrated information when setting 
other localPolicies in namespace (#23764)
    
    Co-authored-by: ruihongzhou <[email protected]>
    (cherry picked from commit bbe2cabc3ec0375607cb12665cab0b4745dbd36e)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 20 ++++++++++------
 .../pulsar/common/naming/NamespaceBundles.java     |  3 ++-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 27 ++++++++++++++++++++++
 .../pulsar/common/policies/data/LocalPolicies.java | 11 ++++++++-
 4 files changed, 52 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d80e2487b4f..ca4c685b280 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -960,7 +960,8 @@ public abstract class NamespacesBase extends AdminResource {
                 LocalPolicies localPolicies = oldPolicies.map(
                         policies -> new LocalPolicies(policies.bundles,
                                 bookieAffinityGroup,
-                                policies.namespaceAntiAffinityGroup))
+                                policies.namespaceAntiAffinityGroup,
+                                policies.migrated))
                         .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
                                 bookieAffinityGroup,
                                 null));
@@ -1779,7 +1780,8 @@ public abstract class NamespacesBase extends 
AdminResource {
             getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
                 lp.map(policies -> new LocalPolicies(policies.bundles,
                         policies.bookieAffinityGroup,
-                        antiAffinityGroup))
+                        antiAffinityGroup,
+                        policies.migrated))
                         .orElseGet(() -> new LocalPolicies(defaultBundle(),
                                 null, antiAffinityGroup))
             );
@@ -1816,7 +1818,8 @@ public abstract class NamespacesBase extends 
AdminResource {
             getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
                 new LocalPolicies(policies.bundles,
                         policies.bookieAffinityGroup,
-                        null));
+                        null,
+                        policies.migrated));
             log.info("[{}] Successfully removed anti-affinity group for a 
namespace={}", clientAppId(), namespaceName);
         } catch (Exception e) {
             log.error("[{}] Failed to remove anti-affinity group for namespace 
{}", clientAppId(), namespaceName, e);
@@ -2765,10 +2768,13 @@ public abstract class NamespacesBase extends 
AdminResource {
     protected void internalEnableMigration(boolean migrated) {
         validateSuperUserAccess();
         try {
-            getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> {
-                policies.migrated = migrated;
-                return policies;
-            });
+            getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, 
oldPolicies -> oldPolicies.map(
+                    policies -> new LocalPolicies(policies.bundles,
+                            policies.bookieAffinityGroup,
+                            policies.namespaceAntiAffinityGroup,
+                            migrated))
+                    .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
+                            null, null, migrated)));
             log.info("Successfully updated migration on namespace {}", 
namespaceName);
         } catch (Exception e) {
             log.error("Failed to update migration on namespace {}", 
namespaceName, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index fa7baeaa606..3ee365cdd45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -197,6 +197,7 @@ public class NamespaceBundles {
     public LocalPolicies toLocalPolicies() {
         return new LocalPolicies(this.getBundlesData(),
                 localPolicies.map(lp -> 
lp.getLeft().bookieAffinityGroup).orElse(null),
-                localPolicies.map(lp -> 
lp.getLeft().namespaceAntiAffinityGroup).orElse(null));
+                localPolicies.map(lp -> 
lp.getLeft().namespaceAntiAffinityGroup).orElse(null),
+                localPolicies.map(lp -> lp.getLeft().migrated).orElse(false));
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f2948660952..18cc449d15d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -2195,4 +2196,30 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         admin.namespaces().deleteNamespace(namespace);
     }
+
+    public void testMigratedInfoIsNotLostDuringOtherLocalPoliciesUpdate() 
throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
+        admin.namespaces().createNamespace(namespace, 
Set.of(testLocalCluster));
+
+        admin.namespaces().updateMigrationState(namespace, true);
+        assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+        String bookieAffinityGroupPrimary = "group1";
+        admin.namespaces().setBookieAffinityGroup(namespace,
+                
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(bookieAffinityGroupPrimary).build());
+        
assertEquals(admin.namespaces().getBookieAffinityGroup(namespace).getBookkeeperAffinityGroupPrimary(),
+                bookieAffinityGroupPrimary);
+        assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+        String namespaceAntiAffinityGroup = "group2";
+        admin.namespaces().setNamespaceAntiAffinityGroup(namespace, 
namespaceAntiAffinityGroup);
+        
assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), 
namespaceAntiAffinityGroup);
+        assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+        admin.namespaces().deleteBookieAffinityGroup(namespace);
+        assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+        admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+        assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index 3b17dbe067e..43f5130eb9f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -34,20 +34,29 @@ public class LocalPolicies {
     public final BookieAffinityGroupData bookieAffinityGroup;
     // namespace anti-affinity-group
     public final String namespaceAntiAffinityGroup;
-    public boolean migrated;
+    public final boolean migrated;
 
     public LocalPolicies() {
         bundles = defaultBundle();
         bookieAffinityGroup = null;
         namespaceAntiAffinityGroup = null;
+        migrated = false;
     }
 
     public LocalPolicies(BundlesData data,
                          BookieAffinityGroupData bookieAffinityGroup,
                          String namespaceAntiAffinityGroup) {
+        this(data, bookieAffinityGroup, namespaceAntiAffinityGroup, false);
+    }
+
+    public LocalPolicies(BundlesData data,
+                         BookieAffinityGroupData bookieAffinityGroup,
+                         String namespaceAntiAffinityGroup,
+                         boolean migrated) {
         bundles = data;
         this.bookieAffinityGroup = bookieAffinityGroup;
         this.namespaceAntiAffinityGroup = namespaceAntiAffinityGroup;
+        this.migrated = migrated;
     }
 
 }
\ No newline at end of file

Reply via email to