This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03dc1be After a bundle split, perform the unload in background (#7387)
03dc1be is described below
commit 03dc1be6e95d8f071e9e1cc2a9e6149f61d49f8e
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 29 22:45:44 2020 -0700
After a bundle split, perform the unload in background (#7387)
---
.../pulsar/broker/namespace/NamespaceService.java | 27 ++++++++--------------
1 file changed, 10 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a1aba8f..77bd62e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -717,7 +717,7 @@ public class NamespaceService {
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
- CompletableFuture<Void> unloadFuture,
+ CompletableFuture<Void>
completionFuture,
NamespaceBundleSplitAlgorithm
splitAlgorithm) {
splitAlgorithm.getSplitBoundary(this,
bundle).whenComplete((splitBoundary, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
@@ -793,15 +793,15 @@ public class NamespaceService {
// retry several times on BadVersion
if ((t instanceof ServerMetadataException) &&
(counter.decrementAndGet() >= 0)) {
pulsar.getOrderedExecutor()
- .execute(() ->
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture,
splitAlgorithm));
+ .execute(() ->
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, completionFuture,
splitAlgorithm));
} else if (t instanceof IllegalArgumentException) {
- unloadFuture.completeExceptionally(t);
+ completionFuture.completeExceptionally(t);
} else {
// Retry enough, or meet other exception
String msg2 = format(" %s not success update
nsBundles, counter %d, reason %s",
bundle.toString(), counter.get(), t.getMessage());
LOG.warn(msg2);
- unloadFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg2));
+ completionFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg2));
}
return;
}
@@ -813,26 +813,19 @@ public class NamespaceService {
// update bundled_topic cache for
load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();
- })
- .thenCompose((v) -> {
- if (!unload) {
- // If we don't need to unload, then we're done
here
- return CompletableFuture.completedFuture(null);
+ completionFuture.complete(null);
+ if (unload) {
+ // Unload new split bundles, in background.
This will not
+ // affect the split operation which is already
safely completed
+ r.forEach(this::unloadNamespaceBundle);
}
- // unload new split bundles
- List<CompletableFuture<Void>> futures = new
ArrayList<>();
- r.forEach(splitBundle ->
futures.add(unloadNamespaceBundle(splitBundle)));
- return FutureUtil.waitForAll(futures);
- })
- .thenRun(() -> {
- unloadFuture.complete(null);
})
.exceptionally(e -> {
String msg1 = format(
"failed to disable bundle %s under
namespace [%s] with error %s",
bundle.getNamespaceObject().toString(),
bundle.toString(), ex.getMessage());
LOG.warn(msg1, e);
- unloadFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg1));
+ completionFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg1));
return null;
});
}, pulsar.getOrderedExecutor());