merlimat closed pull request #2348: Fix NPE when splitting and unloading bundle
URL: https://github.com/apache/incubator-pulsar/pull/2348
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index bf44519e9b..d9a1ccf600 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -593,7 +593,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                        boolean unload,
                                        AtomicInteger counter,
                                        CompletableFuture<Void> unloadFuture) {
-        CompletableFuture<NamespaceBundles> updateFuture = new 
CompletableFuture<>();
+        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
 
         final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = 
bundleFactory.splitBundles(bundle,
             2 /* by default split into 2 */);
@@ -622,7 +622,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                             // namespace bundle
                             
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
 
-                            updateFuture.complete(splittedBundles.getLeft());
+                            updateFuture.complete(splittedBundles.getRight());
                         } else if (rc == Code.BADVERSION.intValue()) {
                             KeeperException keeperException = 
KeeperException.create(KeeperException.Code.get(rc));
                             String msg = format("failed to update namespace 
policies [%s], NamespaceBundle: %s " +
@@ -680,7 +680,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
 
                 if (unload) {
                     // unload new split bundles
-                    r.getBundles().forEach(splitBundle -> {
+                    r.forEach(splitBundle -> {
                         try {
                             unloadNamespaceBundle(splitBundle);
                         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index a2ef39726f..2c51e16fb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -429,6 +429,48 @@ public void 
testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
 
     }
 
+    @Test
+    public void testRemoveOwnershipAndSplitBundle() throws Exception {
+        OwnershipCache ownershipCache = 
spy(pulsar.getNamespaceService().getOwnershipCache());
+        
doNothing().when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
+
+        Field ownership = 
NamespaceService.class.getDeclaredField("ownershipCache");
+        ownership.setAccessible(true);
+        ownership.set(pulsar.getNamespaceService(), ownershipCache);
+
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
+        TopicName topicName = 
TopicName.get("persistent://pulsar/global/ns1/topic-1");
+        NamespaceBundles bundles = 
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+        NamespaceBundle originalBundle = bundles.findBundle(topicName);
+
+        CompletableFuture<Void> result1 = 
namespaceService.splitAndOwnBundle(originalBundle, false);
+        try {
+            result1.get();
+        } catch (Exception e) {
+            fail("split bundle faild", e);
+        }
+
+        NamespaceBundles updatedNsBundles = 
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+        assertNotNull(updatedNsBundles);
+        NamespaceBundle splittedBundle = 
updatedNsBundles.findBundle(topicName);
+
+        updatedNsBundles.getBundles().stream().filter(bundle -> 
!bundle.equals(splittedBundle)).forEach(bundle -> {
+            try {
+                ownershipCache.removeOwnership(bundle).get();
+            } catch (Exception e) {
+                fail("faild to remove ownership", e);
+            }
+        });
+
+        CompletableFuture<Void> result2 = 
namespaceService.splitAndOwnBundle(splittedBundle, true);
+        try {
+            result2.get();
+        } catch (Exception e) {
+            // make sure: NPE does not occur
+            fail("split bundle faild", e);
+        }
+    }
 
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> 
splitBundles(NamespaceBundleFactory utilityFactory,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to