heesung-sn commented on code in PR #19988:
URL: https://github.com/apache/pulsar/pull/19988#discussion_r1155050949


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -826,110 +826,156 @@ private CompletableFuture<Void> splitServiceUnit(String 
serviceUnit, ServiceUnit
                 boundariesSet.add(subBundle.getKeyRange().upperEndpoint());
             });
             boundaries = new ArrayList<>(boundariesSet);
-            nsBundleSplitAlgorithm = 
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+            nsBundleSplitAlgorithm = 
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
         }
         final AtomicInteger counter = new AtomicInteger(0);
+        var childBundles = 
data.splitServiceUnitToDestBroker().keySet().stream()
+                .map(child -> bundleFactory.getBundle(
+                        bundle.getNamespaceObject().toString(), child))
+                .collect(Collectors.toList());
         this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, 
nsBundleSplitAlgorithm,
-                bundle, boundaries, serviceUnit, data, counter, startTime, 
completionFuture);
+                bundle, childBundles, boundaries, data, counter, startTime, 
completionFuture);
         return completionFuture;
     }
 
+
+
     @VisibleForTesting
     protected void splitServiceUnitOnceAndRetry(NamespaceService 
namespaceService,
                                                 NamespaceBundleFactory 
bundleFactory,
                                                 NamespaceBundleSplitAlgorithm 
algorithm,
-                                                NamespaceBundle bundle,
+                                                NamespaceBundle parentBundle,
+                                                List<NamespaceBundle> 
childBundles,
                                                 List<Long> boundaries,
-                                                String serviceUnit,
-                                                ServiceUnitStateData data,
+                                                ServiceUnitStateData 
parentData,
                                                 AtomicInteger counter,
                                                 long startTime,
                                                 CompletableFuture<Void> 
completionFuture) {
-        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
-
-        namespaceService.getSplitBoundary(bundle, algorithm, boundaries)
-                .thenAccept(splitBundlesPair -> {
-            // Split and updateNamespaceBundles. Update may fail because of 
concurrent write to Zookeeper.
-            if (splitBundlesPair == null) {
-                String msg = format("Bundle %s not found under namespace", 
serviceUnit);
-                updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
-                return;
+        ownChildBundles(childBundles, parentData)
+                .thenCompose(__ -> updateSplitNamespaceBundlesAsync(
+                        namespaceService, bundleFactory, algorithm, 
parentBundle, childBundles, boundaries))
+                .thenAccept(__ -> // Update bundled_topic cache for 
load-report-generation
+                        
pulsar.getBrokerService().refreshTopicToStatsMaps(parentBundle))
+                .thenAccept(__ -> pubAsync(parentBundle.toString(), new 
ServiceUnitStateData(
+                                Deleted, null, parentData.sourceBroker(), 
getNextVersionId(parentData))))
+                .thenAccept(__ -> {
+                    double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
+                    log.info("Successfully split {} parent namespace-bundle to 
{} in {} ms",
+                            parentBundle, childBundles, splitBundleTime);
+                    completionFuture.complete(null);
+                })
+                .exceptionally(ex -> {
+                    // Retry several times on BadVersion
+                    Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                    if ((throwable instanceof 
MetadataStoreException.BadVersionException)
+                            && (counter.incrementAndGet() < 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
+                        log.warn("Failed to update bundle range in metadata 
store. Retrying {} th / {} limit",
+                                counter.get(), 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, ex);
+                        pulsar.getExecutor().schedule(() -> 
splitServiceUnitOnceAndRetry(
+                                namespaceService, bundleFactory, algorithm, 
parentBundle, childBundles,
+                                        boundaries, parentData, counter, 
startTime, completionFuture),
+                                100, MILLISECONDS);
+                    } else {
+                        // Retry enough, or meet other exception
+                        String msg = format("Failed to split bundle %s, 
Retried %d th / %d limit, reason %s",
+                                parentBundle.toString(), counter.get(),
+                                NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, 
throwable.getMessage());
+                        log.warn(msg, throwable);
+                        completionFuture.completeExceptionally(
+                                new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+                    }
+                    return null;
+                });
+    }
+
+    private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle> 
childBundles,
+                                                    ServiceUnitStateData 
parentData) {
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(childBundles.size());
+        var debug = debug();
+        for (var childBundle : childBundles) {
+            var childBundleStr = childBundle.toString();
+            var childData = tableview.get(childBundleStr);
+            if (childData != null) {
+                if (debug) {
+                    log.info("Already owned child bundle:{}", childBundleStr);
+                }
+            } else {
+                childData = new ServiceUnitStateData(Owned, 
parentData.sourceBroker(),
+                        VERSION_ID_INIT);
+                futures.add(pubAsync(childBundleStr, childData).thenApply(__ 
-> null));
+            }
+        }
+
+        if (!futures.isEmpty()) {
+            return FutureUtil.waitForAll(futures);
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(
+            NamespaceService namespaceService,
+            NamespaceBundleFactory bundleFactory,
+            NamespaceBundleSplitAlgorithm algorithm,
+            NamespaceBundle parentBundle,
+            List<NamespaceBundle> childBundles,
+            List<Long> boundaries) {
+        CompletableFuture<Void> updateSplitNamespaceBundlesFuture = new 
CompletableFuture<>();
+        var namespaceName = parentBundle.getNamespaceObject();
+        final var debug = debug();
+        var targetNsBundle = 
bundleFactory.getBundles(parentBundle.getNamespaceObject());
+        boolean updated = false;
+        try {
+            targetNsBundle.validateBundle(parentBundle);
+        } catch (IllegalArgumentException e) {
+            if (debug) {
+                log.info("Namespace bundles do not contain the parent 
bundle:{}",

Review Comment:
   If namespace bundles do not contain the parent bundle, this means the 
namespace bundles must have child bundles(the namespace policy must have been 
updated). Here, we redundantly verify if the child bundles are there. If there 
is no parent bundle, and if there are no child bundles, the current logic will 
retry forever. I can't think of a case that the current logics lead to this 
state, but in the worst case, the error count of the split metrics should 
constantly increase, expecting the admins to unload this parent bundle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to