This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43dccccb92b6db46586a51d18c157327e6c071a0
Author: LinChen <[email protected]>
AuthorDate: Sun Oct 9 21:24:24 2022 +0800

    [fix][broker] Update new bundle-range to  policies after bundle split 
(#17797)
    
    Co-authored-by: leolinchen <[email protected]>
---
 .../pulsar/broker/namespace/NamespaceService.java  | 42 +++++++++++++++++++---
 .../broker/namespace/NamespaceServiceTest.java     | 11 ++++--
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5c288af7b8a..12dc31f3428 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -89,6 +90,7 @@ import 
org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -867,11 +869,12 @@ public class NamespaceService implements AutoCloseable {
                                     for (NamespaceBundle sBundle : 
splittedBundles.getRight()) {
                                         
checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle));
                                     }
-                                    updateNamespaceBundles(nsname, 
splittedBundles.getLeft())
-                                            .thenRun(() -> {
-                                                
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
-                                                
updateFuture.complete(splittedBundles.getRight());
-                                            }).exceptionally(ex1 -> {
+                                    updateNamespaceBundles(nsname, 
splittedBundles.getLeft()).thenCompose(__ -> {
+                                        return 
updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft());
+                                    }).thenRun(() -> {
+                                        
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+                                        
updateFuture.complete(splittedBundles.getRight());
+                                    }).exceptionally(ex1 -> {
                                         String msg = format("failed to update 
namespace policies [%s], "
                                                         + "NamespaceBundle: %s 
due to %s",
                                                 nsname.toString(), 
bundle.getBundleRange(), ex1.getMessage());
@@ -946,6 +949,35 @@ public class NamespaceService implements AutoCloseable {
         });
     }
 
+    /**
+     * Update new bundle-range to admin/policies/namespace.
+     * Update may fail because of concurrent write to Zookeeper.
+     *
+     * @param nsname
+     * @param nsBundles
+     * @throws Exception
+     */
+    private CompletableFuture<Void> 
updateNamespaceBundlesForPolicies(NamespaceName nsname,
+                                                                      
NamespaceBundles nsBundles) {
+        Objects.requireNonNull(nsname);
+        Objects.requireNonNull(nsBundles);
+
+        return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies
 -> {
+            if (policies.isPresent()) {
+                return 
pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(nsname, 
oldPolicies -> {
+                    oldPolicies.bundles = nsBundles.getBundlesData();
+                    return oldPolicies;
+                });
+            } else {
+                LOG.error("Policies of namespace {} is not exist!", nsname);
+                Policies newPolicies = new Policies();
+                newPolicies.bundles = nsBundles.getBundlesData();
+                return 
pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(nsname, 
newPolicies);
+            }
+        });
+    }
+
+
     /**
      * Update new bundle-range to LocalZk (create a new node if not present).
      * Update may fail because of concurrent write to Zookeeper.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 2b982d2e059..fe3976c6d1d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -159,11 +159,16 @@ public class NamespaceServiceTest extends BrokerTestBase {
         splitBundleSet.removeAll(bundleList);
         assertTrue(splitBundleSet.isEmpty());
 
-        // (2) validate LocalZookeeper policies updated with newly created 
split
+        // (2) validate localPolicies and policies  updated with newly created 
split
         // bundles
-        LocalPolicies policies = 
pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
-        NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, 
policies.bundles);
+        LocalPolicies localPolicies = 
pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
+        NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, 
localPolicies.bundles);
         assertEquals(localZkBundles, updatedNsBundles);
+        log.info("LocalPolicies: {}", localPolicies);
+
+        Policies policies = 
pulsar.getPulsarResources().getNamespaceResources().getPolicies(nsname).get();
+        NamespaceBundles zkBundles = bundleFactory.getBundles(nsname, 
policies.bundles);
+        assertEquals(zkBundles, updatedNsBundles);
         log.info("Policies: {}", policies);
 
         // (3) validate ownership of new split bundles by local owner

Reply via email to