heesung-sn commented on code in PR #19988:
URL: https://github.com/apache/pulsar/pull/19988#discussion_r1159269952


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -857,110 +858,158 @@ private CompletableFuture<Void> splitServiceUnit(String 
serviceUnit, ServiceUnit
                 boundariesSet.add(subBundle.getKeyRange().upperEndpoint());
             });
             boundaries = new ArrayList<>(boundariesSet);
-            nsBundleSplitAlgorithm = 
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+            nsBundleSplitAlgorithm = 
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
         }
         final AtomicInteger counter = new AtomicInteger(0);
+        var childBundles = 
data.splitServiceUnitToDestBroker().keySet().stream()
+                .map(child -> bundleFactory.getBundle(
+                        bundle.getNamespaceObject().toString(), child))
+                .collect(Collectors.toList());
         this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, 
nsBundleSplitAlgorithm,
-                bundle, boundaries, serviceUnit, data, counter, startTime, 
completionFuture);
+                bundle, childBundles, boundaries, data, counter, startTime, 
completionFuture);
         return completionFuture;
     }
 
+
+
     @VisibleForTesting
     protected void splitServiceUnitOnceAndRetry(NamespaceService 
namespaceService,
                                                 NamespaceBundleFactory 
bundleFactory,
                                                 NamespaceBundleSplitAlgorithm 
algorithm,
-                                                NamespaceBundle bundle,
+                                                NamespaceBundle parentBundle,
+                                                List<NamespaceBundle> 
childBundles,
                                                 List<Long> boundaries,
-                                                String serviceUnit,
-                                                ServiceUnitStateData data,
+                                                ServiceUnitStateData 
parentData,
                                                 AtomicInteger counter,
                                                 long startTime,
                                                 CompletableFuture<Void> 
completionFuture) {
-        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
-
-        namespaceService.getSplitBoundary(bundle, algorithm, boundaries)
-                .thenAccept(splitBundlesPair -> {
-            // Split and updateNamespaceBundles. Update may fail because of 
concurrent write to Zookeeper.
-            if (splitBundlesPair == null) {
-                String msg = format("Bundle %s not found under namespace", 
serviceUnit);
-                updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
-                return;
-            }
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.sourceBroker(), VERSION_ID_INIT);
-            NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
-            List<NamespaceBundle> splitBundles = 
Collections.unmodifiableList(splitBundlesPair.getRight());
-            List<NamespaceBundle> successPublishedBundles =
-                    Collections.synchronizedList(new 
ArrayList<>(splitBundles.size()));
-            List<CompletableFuture<Void>> futures = new 
ArrayList<>(splitBundles.size());
-            for (NamespaceBundle sBundle : splitBundles) {
-                futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ 
-> successPublishedBundles.add(sBundle)));
-            }
-            NamespaceName nsname = bundle.getNamespaceObject();
-            FutureUtil.waitForAll(futures)
-                    .thenCompose(__ -> 
namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
-                    .thenCompose(__ -> 
namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
-                    .thenRun(() -> {
-                        
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
-                        updateFuture.complete(splitBundles);
-                    }).exceptionally(e -> {
-                        // Clean the new bundle when has exception.
-                        List<CompletableFuture<Void>> futureList = new 
ArrayList<>();
-                        for (NamespaceBundle sBundle : 
successPublishedBundles) {
-                            
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
-                        }
-                        FutureUtil.waitForAll(futureList)
-                                .whenComplete((__, ex) -> {
-                                    if (ex != null) {
-                                        log.warn("Clean new bundles failed,", 
ex);
-                                    }
-                                    updateFuture.completeExceptionally(e);
-                                });
-                        return null;
-                    });
-        }).exceptionally(e -> {
-            updateFuture.completeExceptionally(e);
-            return null;
-        });
+        ownChildBundles(childBundles, parentData)

Review Comment:
   Child bundles won't be looked up util we update NamespaceBundles in the 
metadata store. Meanwhile, the topic will still look up the parent bundle.
   
   ```
   public NamespaceBundle getBundle(TopicName topic) {
           NamespaceBundles bundles = 
bundlesCache.synchronous().get(topic.getNamespaceObject());
           return bundles != null ? bundles.findBundle(topic) : null;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to