This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03dc1be  After a bundle split, perform the unload in background (#7387)
03dc1be is described below

commit 03dc1be6e95d8f071e9e1cc2a9e6149f61d49f8e
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 29 22:45:44 2020 -0700

    After a bundle split, perform the unload in background (#7387)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 27 ++++++++--------------
 1 file changed, 10 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a1aba8f..77bd62e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -717,7 +717,7 @@ public class NamespaceService {
     void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                        boolean unload,
                                        AtomicInteger counter,
-                                       CompletableFuture<Void> unloadFuture,
+                                       CompletableFuture<Void> 
completionFuture,
                                        NamespaceBundleSplitAlgorithm 
splitAlgorithm) {
         splitAlgorithm.getSplitBoundary(this, 
bundle).whenComplete((splitBoundary, ex) -> {
             CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
@@ -793,15 +793,15 @@ public class NamespaceService {
                     // retry several times on BadVersion
                     if ((t instanceof ServerMetadataException) && 
(counter.decrementAndGet() >= 0)) {
                         pulsar.getOrderedExecutor()
-                            .execute(() -> 
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, 
splitAlgorithm));
+                            .execute(() -> 
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, completionFuture, 
splitAlgorithm));
                     } else if (t instanceof IllegalArgumentException) {
-                        unloadFuture.completeExceptionally(t);
+                        completionFuture.completeExceptionally(t);
                     } else {
                         // Retry enough, or meet other exception
                         String msg2 = format(" %s not success update 
nsBundles, counter %d, reason %s",
                             bundle.toString(), counter.get(), t.getMessage());
                         LOG.warn(msg2);
-                        unloadFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg2));
+                        completionFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg2));
                     }
                     return;
                 }
@@ -813,26 +813,19 @@ public class NamespaceService {
                             // update bundled_topic cache for 
load-report-generation
                             
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
                             loadManager.get().setLoadReportForceUpdateFlag();
-                        })
-                        .thenCompose((v) -> {
-                            if (!unload) {
-                                // If we don't need to unload, then we're done 
here
-                                return CompletableFuture.completedFuture(null);
+                            completionFuture.complete(null);
+                            if (unload) {
+                                // Unload new split bundles, in background. 
This will not
+                                // affect the split operation which is already 
safely completed
+                                r.forEach(this::unloadNamespaceBundle);
                             }
-                            // unload new split bundles
-                            List<CompletableFuture<Void>> futures = new 
ArrayList<>();
-                            r.forEach(splitBundle -> 
futures.add(unloadNamespaceBundle(splitBundle)));
-                            return FutureUtil.waitForAll(futures);
-                        })
-                        .thenRun(() -> {
-                            unloadFuture.complete(null);
                         })
                         .exceptionally(e -> {
                             String msg1 = format(
                                     "failed to disable bundle %s under 
namespace [%s] with error %s",
                                     bundle.getNamespaceObject().toString(), 
bundle.toString(), ex.getMessage());
                             LOG.warn(msg1, e);
-                            unloadFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg1));
+                            completionFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg1));
                             return null;
                         });
             }, pulsar.getOrderedExecutor());

Reply via email to