This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a850934 Fix NPE when splitting and unloading bundle (#2348)
a850934 is described below
commit a8509341cc8373e849a52bffdb140184371509a3
Author: massakam <[email protected]>
AuthorDate: Fri Aug 10 13:38:52 2018 +0900
Fix NPE when splitting and unloading bundle (#2348)
---
.../pulsar/broker/namespace/NamespaceService.java | 6 ++--
.../broker/namespace/NamespaceServiceTest.java | 42 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index bf44519..d9a1ccf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -593,7 +593,7 @@ public class NamespaceService {
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> unloadFuture) {
- CompletableFuture<NamespaceBundles> updateFuture = new
CompletableFuture<>();
+ CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles =
bundleFactory.splitBundles(bundle,
2 /* by default split into 2 */);
@@ -622,7 +622,7 @@ public class NamespaceService {
// namespace bundle
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
- updateFuture.complete(splittedBundles.getLeft());
+ updateFuture.complete(splittedBundles.getRight());
} else if (rc == Code.BADVERSION.intValue()) {
KeeperException keeperException =
KeeperException.create(KeeperException.Code.get(rc));
String msg = format("failed to update namespace
policies [%s], NamespaceBundle: %s " +
@@ -680,7 +680,7 @@ public class NamespaceService {
if (unload) {
// unload new split bundles
- r.getBundles().forEach(splitBundle -> {
+ r.forEach(splitBundle -> {
try {
unloadNamespaceBundle(splitBundle);
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index a2ef397..2c51e16 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -429,6 +429,48 @@ public class NamespaceServiceTest extends BrokerTestBase {
}
+ @Test
+ public void testRemoveOwnershipAndSplitBundle() throws Exception {
+ OwnershipCache ownershipCache =
spy(pulsar.getNamespaceService().getOwnershipCache());
+
doNothing().when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
+
+ Field ownership =
NamespaceService.class.getDeclaredField("ownershipCache");
+ ownership.setAccessible(true);
+ ownership.set(pulsar.getNamespaceService(), ownershipCache);
+
+ NamespaceService namespaceService = pulsar.getNamespaceService();
+ NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
+ TopicName topicName =
TopicName.get("persistent://pulsar/global/ns1/topic-1");
+ NamespaceBundles bundles =
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+ NamespaceBundle originalBundle = bundles.findBundle(topicName);
+
+ CompletableFuture<Void> result1 =
namespaceService.splitAndOwnBundle(originalBundle, false);
+ try {
+ result1.get();
+ } catch (Exception e) {
+ fail("split bundle faild", e);
+ }
+
+ NamespaceBundles updatedNsBundles =
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+ assertNotNull(updatedNsBundles);
+ NamespaceBundle splittedBundle =
updatedNsBundles.findBundle(topicName);
+
+ updatedNsBundles.getBundles().stream().filter(bundle ->
!bundle.equals(splittedBundle)).forEach(bundle -> {
+ try {
+ ownershipCache.removeOwnership(bundle).get();
+ } catch (Exception e) {
+ fail("faild to remove ownership", e);
+ }
+ });
+
+ CompletableFuture<Void> result2 =
namespaceService.splitAndOwnBundle(splittedBundle, true);
+ try {
+ result2.get();
+ } catch (Exception e) {
+ // make sure: NPE does not occur
+ fail("split bundle faild", e);
+ }
+ }
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>>
splitBundles(NamespaceBundleFactory utilityFactory,