heesung-sn commented on code in PR #19988:
URL: https://github.com/apache/pulsar/pull/19988#discussion_r1159304009
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -857,110 +858,158 @@ private CompletableFuture<Void> splitServiceUnit(String
serviceUnit, ServiceUnit
boundariesSet.add(subBundle.getKeyRange().upperEndpoint());
});
boundaries = new ArrayList<>(boundariesSet);
- nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+ nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
}
final AtomicInteger counter = new AtomicInteger(0);
+ var childBundles =
data.splitServiceUnitToDestBroker().keySet().stream()
+ .map(child -> bundleFactory.getBundle(
+ bundle.getNamespaceObject().toString(), child))
+ .collect(Collectors.toList());
this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
nsBundleSplitAlgorithm,
- bundle, boundaries, serviceUnit, data, counter, startTime,
completionFuture);
+ bundle, childBundles, boundaries, data, counter, startTime,
completionFuture);
return completionFuture;
}
+
+
@VisibleForTesting
protected void splitServiceUnitOnceAndRetry(NamespaceService
namespaceService,
NamespaceBundleFactory
bundleFactory,
NamespaceBundleSplitAlgorithm
algorithm,
- NamespaceBundle bundle,
+ NamespaceBundle parentBundle,
+ List<NamespaceBundle>
childBundles,
List<Long> boundaries,
- String serviceUnit,
- ServiceUnitStateData data,
+ ServiceUnitStateData
parentData,
AtomicInteger counter,
long startTime,
CompletableFuture<Void>
completionFuture) {
- CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
-
- namespaceService.getSplitBoundary(bundle, algorithm, boundaries)
- .thenAccept(splitBundlesPair -> {
- // Split and updateNamespaceBundles. Update may fail because of
concurrent write to Zookeeper.
- if (splitBundlesPair == null) {
- String msg = format("Bundle %s not found under namespace",
serviceUnit);
- updateFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
- return;
- }
- ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.sourceBroker(), VERSION_ID_INIT);
- NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
- List<NamespaceBundle> splitBundles =
Collections.unmodifiableList(splitBundlesPair.getRight());
- List<NamespaceBundle> successPublishedBundles =
- Collections.synchronizedList(new
ArrayList<>(splitBundles.size()));
- List<CompletableFuture<Void>> futures = new
ArrayList<>(splitBundles.size());
- for (NamespaceBundle sBundle : splitBundles) {
- futures.add(pubAsync(sBundle.toString(), next).thenAccept(__
-> successPublishedBundles.add(sBundle)));
- }
- NamespaceName nsname = bundle.getNamespaceObject();
- FutureUtil.waitForAll(futures)
- .thenCompose(__ ->
namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
- .thenCompose(__ ->
namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
- .thenRun(() -> {
-
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
- updateFuture.complete(splitBundles);
- }).exceptionally(e -> {
- // Clean the new bundle when has exception.
- List<CompletableFuture<Void>> futureList = new
ArrayList<>();
- for (NamespaceBundle sBundle :
successPublishedBundles) {
-
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
- }
- FutureUtil.waitForAll(futureList)
- .whenComplete((__, ex) -> {
- if (ex != null) {
- log.warn("Clean new bundles failed,",
ex);
- }
- updateFuture.completeExceptionally(e);
- });
- return null;
- });
- }).exceptionally(e -> {
- updateFuture.completeExceptionally(e);
- return null;
- });
+ ownChildBundles(childBundles, parentData)
Review Comment:
Also, if we dont pre-own child bundles, there is a chance that the child
bundles might get assigned to other brokers, right after the metadata update,
but before the parent bundle moves to the deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]