This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0bfd38ee3041b9d6be4b81b431cfe4da08dd39f2
Author: Nicklee007 <[email protected]>
AuthorDate: Tue Jan 18 14:55:16 2022 +0800

    Release old bundle from ownership cache when operator split bundle (#13678)
    
    Fixes #13677
    
     Modifications
    
    release the old bundle from ownership and temporary znode cache when we 
split the old bundle;
    
    (cherry picked from commit 0cd9462802b51b0026c9c641b82861aaaa8f2d9b)
---
 .../pulsar/broker/namespace/NamespaceService.java  |  2 +
 .../broker/namespace/NamespaceServiceTest.java     | 43 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index c0eee85..3c31f71 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -894,6 +894,8 @@ public class NamespaceService implements AutoCloseable {
                             // update bundled_topic cache for 
load-report-generation
                             
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
                             loadManager.get().setLoadReportForceUpdateFlag();
+                            // release old bundle from ownership cache
+                            
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
                             completionFuture.complete(null);
                             if (unload) {
                                 // Unload new split bundles, in background. 
This will not
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index bb33ac8..1420bc6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.awaitility.Awaitility;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -491,6 +492,48 @@ public class NamespaceServiceTest extends BrokerTestBase {
         }
     }
 
+
+    @Test
+    public void testSplitBundleAndRemoveOldBundleFromOwnerShipCache() throws 
Exception {
+        OwnershipCache ownershipCache = 
spy(pulsar.getNamespaceService().getOwnershipCache());
+        
doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
+
+        Field ownership = 
NamespaceService.class.getDeclaredField("ownershipCache");
+        ownership.setAccessible(true);
+        ownership.set(pulsar.getNamespaceService(), ownershipCache);
+
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
+        TopicName topicName = 
TopicName.get("persistent://pulsar/global/ns1/topic-1");
+        NamespaceBundles bundles = 
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+
+        NamespaceBundle splitBundle1 = bundles.findBundle(topicName);
+        ownershipCache.tryAcquiringOwnership(splitBundle1);
+        CompletableFuture<Void> result1 = 
namespaceService.splitAndOwnBundle(splitBundle1, false, 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
+        try {
+            result1.get();
+        } catch (Exception e) {
+            fail("split bundle failed", e);
+        }
+        Awaitility.await().untilAsserted(()
+                -> 
assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle1)));
+
+        //unload split
+        bundles = 
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+        assertNotNull(bundles);
+        NamespaceBundle splitBundle2 = bundles.findBundle(topicName);
+        CompletableFuture<Void> result2 = 
namespaceService.splitAndOwnBundle(splitBundle2, true, 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
+        try {
+            result2.get();
+        } catch (Exception e) {
+            // make sure: NPE does not occur
+            fail("split bundle failed", e);
+        }
+        Awaitility.await().untilAsserted(()
+                -> 
assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle2)));
+    }
+
+
     @Test
     public void testSplitLargestBundle() throws Exception {
         String namespace = "prop/test/ns-abc2";

Reply via email to