This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0bfd38ee3041b9d6be4b81b431cfe4da08dd39f2 Author: Nicklee007 <[email protected]> AuthorDate: Tue Jan 18 14:55:16 2022 +0800 Release old bundle from ownership cache when operator split bundle (#13678) Fixes #13677 Modifications release the old bundle from ownership and temporary znode cache when we split the old bundle; (cherry picked from commit 0cd9462802b51b0026c9c641b82861aaaa8f2d9b) --- .../pulsar/broker/namespace/NamespaceService.java | 2 + .../broker/namespace/NamespaceServiceTest.java | 43 ++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index c0eee85..3c31f71 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -894,6 +894,8 @@ public class NamespaceService implements AutoCloseable { // update bundled_topic cache for load-report-generation pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); loadManager.get().setLoadReportForceUpdateFlag(); + // release old bundle from ownership cache + pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle); completionFuture.complete(null); if (unload) { // Unload new split bundles, in background. This will not diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index bb33ac8..1420bc6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -73,6 +73,7 @@ import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.awaitility.Awaitility; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -491,6 +492,48 @@ public class NamespaceServiceTest extends BrokerTestBase { } } + + @Test + public void testSplitBundleAndRemoveOldBundleFromOwnerShipCache() throws Exception { + OwnershipCache ownershipCache = spy(pulsar.getNamespaceService().getOwnershipCache()); + doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache).disableOwnership(any(NamespaceBundle.class)); + + Field ownership = NamespaceService.class.getDeclaredField("ownershipCache"); + ownership.setAccessible(true); + ownership.set(pulsar.getNamespaceService(), ownershipCache); + + NamespaceService namespaceService = pulsar.getNamespaceService(); + NamespaceName nsname = NamespaceName.get("pulsar/global/ns1"); + TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1"); + NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + + NamespaceBundle splitBundle1 = bundles.findBundle(topicName); + ownershipCache.tryAcquiringOwnership(splitBundle1); + CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(splitBundle1, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + try { + result1.get(); + } catch (Exception e) { + fail("split bundle failed", e); + } + Awaitility.await().untilAsserted(() + -> assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle1))); + + //unload split + bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + assertNotNull(bundles); + NamespaceBundle splitBundle2 = bundles.findBundle(topicName); + CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splitBundle2, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + try { + result2.get(); + } catch (Exception e) { + // make sure: NPE does not occur + fail("split bundle failed", e); + } + Awaitility.await().untilAsserted(() + -> assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle2))); + } + + @Test public void testSplitLargestBundle() throws Exception { String namespace = "prop/test/ns-abc2";
