This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 43dccccb92b6db46586a51d18c157327e6c071a0 Author: LinChen <[email protected]> AuthorDate: Sun Oct 9 21:24:24 2022 +0800 [fix][broker] Update new bundle-range to policies after bundle split (#17797) Co-authored-by: leolinchen <[email protected]> --- .../pulsar/broker/namespace/NamespaceService.java | 42 +++++++++++++++++++--- .../broker/namespace/NamespaceServiceTest.java | 11 ++++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 5c288af7b8a..12dc31f3428 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -34,6 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -89,6 +90,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -867,11 +869,12 @@ public class NamespaceService implements AutoCloseable { for (NamespaceBundle sBundle : splittedBundles.getRight()) { checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); } - updateNamespaceBundles(nsname, splittedBundles.getLeft()) - .thenRun(() -> { - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - updateFuture.complete(splittedBundles.getRight()); - }).exceptionally(ex1 -> { + updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ -> { + return updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft()); + }).thenRun(() -> { + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + updateFuture.complete(splittedBundles.getRight()); + }).exceptionally(ex1 -> { String msg = format("failed to update namespace policies [%s], " + "NamespaceBundle: %s due to %s", nsname.toString(), bundle.getBundleRange(), ex1.getMessage()); @@ -946,6 +949,35 @@ public class NamespaceService implements AutoCloseable { }); } + /** + * Update new bundle-range to admin/policies/namespace. + * Update may fail because of concurrent write to Zookeeper. + * + * @param nsname + * @param nsBundles + * @throws Exception + */ + private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname, + NamespaceBundles nsBundles) { + Objects.requireNonNull(nsname); + Objects.requireNonNull(nsBundles); + + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies -> { + if (policies.isPresent()) { + return pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(nsname, oldPolicies -> { + oldPolicies.bundles = nsBundles.getBundlesData(); + return oldPolicies; + }); + } else { + LOG.error("Policies of namespace {} is not exist!", nsname); + Policies newPolicies = new Policies(); + newPolicies.bundles = nsBundles.getBundlesData(); + return pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(nsname, newPolicies); + } + }); + } + + /** * Update new bundle-range to LocalZk (create a new node if not present). * Update may fail because of concurrent write to Zookeeper. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 2b982d2e059..fe3976c6d1d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -159,11 +159,16 @@ public class NamespaceServiceTest extends BrokerTestBase { splitBundleSet.removeAll(bundleList); assertTrue(splitBundleSet.isEmpty()); - // (2) validate LocalZookeeper policies updated with newly created split + // (2) validate localPolicies and policies updated with newly created split // bundles - LocalPolicies policies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get(); - NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles); + LocalPolicies localPolicies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get(); + NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, localPolicies.bundles); assertEquals(localZkBundles, updatedNsBundles); + log.info("LocalPolicies: {}", localPolicies); + + Policies policies = pulsar.getPulsarResources().getNamespaceResources().getPolicies(nsname).get(); + NamespaceBundles zkBundles = bundleFactory.getBundles(nsname, policies.bundles); + assertEquals(zkBundles, updatedNsBundles); log.info("Policies: {}", policies); // (3) validate ownership of new split bundles by local owner
