heesung-sn commented on code in PR #19773:
URL: https://github.com/apache/pulsar/pull/19773#discussion_r1136042304
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -799,8 +801,29 @@ protected void
splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
long startTime,
CompletableFuture<Void>
completionFuture) {
CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
+ Map<String, Optional<String>> bundleToDestBroker =
data.splitServiceUnitToDestBroker();
+ // 1. verify the sub bundles is correct.
+ if (bundleToDestBroker.size() != 2) {
Review Comment:
We should move this validation to the constructors if required.
```
public record Split(
String serviceUnit, String sourceBroker, Map<String,
Optional<String>> splitServiceUnitToDestBroker) {
public Split {
Objects.requireNonNull(serviceUnit);
}
}
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String
sourceBroker,
Map<String, Optional<String>>
splitServiceUnitToDestBroker, long versionId) {
this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker,
false,
System.currentTimeMillis(), versionId);
}
```
Also, currently, split requests from the split scheduler do pass an empty
splitServiceUnitToDestBroker (or null), expecting split boundaries to be
auto-selected based on the configured algo.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -799,8 +801,29 @@ protected void
splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
long startTime,
CompletableFuture<Void>
completionFuture) {
CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
+ Map<String, Optional<String>> bundleToDestBroker =
data.splitServiceUnitToDestBroker();
+ // 1. verify the sub bundles is correct.
+ if (bundleToDestBroker.size() != 2) {
+ String msg = format("Bundle %s should be split into 2 bundles, but
got %d bundles",
+ serviceUnit, bundleToDestBroker.size());
+ updateFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
+ return;
+ }
+ String namespace = bundle.getNamespaceObject().toString();
+
+ // 2. get the split boundary.
+ Set<Long> boundaries = new HashSet<>();
+ bundleToDestBroker.forEach((bundleRange, destBroker) -> {
+ NamespaceBundle subBundle = bundleFactory.getBundle(namespace,
bundleRange);
+ boundaries.add(subBundle.getKeyRange().lowerEndpoint());
+ boundaries.add(subBundle.getKeyRange().upperEndpoint());
+ });
- pulsar.getNamespaceService().getSplitBoundary(bundle,
null).thenAccept(splitBundlesPair -> {
+ // 3. split the bundle use the specified boundary.
+ NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
+ NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+ pulsar.getNamespaceService().getSplitBoundary(bundle,
nsBundleSplitAlgorithm, new ArrayList<>(boundaries))
Review Comment:
Again, the split scheduler does not pass the boundaries. It seems that this
change will break the split scheduler behavior.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -369,6 +378,53 @@ private CompletableFuture<Void> unloadAsync(Unload unload,
return unloadManager.waitAsync(future, unload.serviceUnit(), timeout,
timeoutUnit);
}
+ public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId
bundle,
+
NamespaceBundleSplitAlgorithm splitAlgorithm,
+ List<Long>
boundaries) {
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle.toString());
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle.toString());
+ NamespaceBundle namespaceBundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName,
bundleRange);
+ return pulsar.getNamespaceService().getSplitBoundary(namespaceBundle,
splitAlgorithm, boundaries)
+ .thenCompose(splitBundlesPair -> {
+ if (splitBundlesPair == null) {
+ String msg = format("Bundle %s not found under
namespace", namespaceBundle);
+ log.error(msg);
+ return FutureUtil.failedFuture(new
IllegalStateException(msg));
+ }
+
+ return getOwnershipAsync(Optional.empty(), bundle)
+ .thenCompose(brokerOpt -> {
+ if (brokerOpt.isEmpty()) {
+ String msg = String.format("Namespace
bundle: %s is not owned by any broker.",
+ bundle);
+ log.warn(msg);
+ throw new IllegalStateException(msg);
+ }
+ String sourceBroker = brokerOpt.get();
+ SplitDecision splitDecision = new
SplitDecision();
+ List<NamespaceBundle> splitBundles =
splitBundlesPair.getRight();
+ Map<String, Optional<String>>
splitServiceUnitToDestBroker = new HashMap<>();
+ splitBundles.forEach(splitBundle ->
splitServiceUnitToDestBroker
+ .put(splitBundle.getBundleRange(),
Optional.empty()));
+ splitDecision.setSplit(
+ new Split(bundle.toString(),
sourceBroker, splitServiceUnitToDestBroker));
Review Comment:
can we just pass `List<Long> boundaries` to `new Split`?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java:
##########
@@ -553,11 +552,24 @@ public void splitAndRetryTest() throws Exception {
}
// Call the real method
reset(namespaceService);
+ doReturn(CompletableFuture.completedFuture(List.of("test-topic-1",
"test-topic-2")))
+
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
return future;
}).when(namespaceService).updateNamespaceBundles(any(), any());
doReturn(namespaceService).when(pulsar1).getNamespaceService();
-
- Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>());
Review Comment:
Like commented above, the split scheduler passes an empty map, and this BSC
test case validates if the split event completes with an empty map.
Please provide another UT for the non-empty case(admin split case).
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java:
##########
@@ -27,7 +29,8 @@
*/
public record ServiceUnitStateData(
- ServiceUnitState state, String dstBroker, String sourceBroker, boolean
force, long timestamp, long versionId) {
+ ServiceUnitState state, String dstBroker, String sourceBroker,
+ Map<String, Optional<String>> splitServiceUnitToDestBroker, boolean
force, long timestamp, long versionId) {
Review Comment:
1. Don't we need to keep the order of boundaries? Can it be a List?
2. Can you explain why we have the dstBroker value? Can we remove the
dstBroker?
3. Currently, I think we only require split boundaries. Can we make it
`List<Long> splitBoundaries`?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java:
##########
@@ -36,16 +39,22 @@ public record ServiceUnitStateData(
}
}
+ public ServiceUnitStateData(ServiceUnitState state, String dstBroker,
String sourceBroker,
+ Map<String, Optional<String>>
splitServiceUnitToDestBroker, long versionId) {
+ this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker,
false,
+ System.currentTimeMillis(), versionId);
+ }
+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker,
String sourceBroker, long versionId) {
- this(state, dstBroker, sourceBroker, false,
System.currentTimeMillis(), versionId);
+ this(state, dstBroker, sourceBroker, Map.of(), false,
System.currentTimeMillis(), versionId);
Review Comment:
`Map.of` seems redundant. Can it be null? (applies the same for the below
constructors)
##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java:
##########
@@ -28,7 +28,7 @@
/**
* This algorithm divides the bundle into several parts by the specified
positions.
*/
-public class SpecifiedPositionsBundleSplitAlgorithm implements
NamespaceBundleSplitAlgorithm{
Review Comment:
Seems like there is only whitespace change in this class. Can you revert
this part?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -799,8 +801,29 @@ protected void
splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
long startTime,
CompletableFuture<Void>
completionFuture) {
CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
+ Map<String, Optional<String>> bundleToDestBroker =
data.splitServiceUnitToDestBroker();
+ // 1. verify the sub bundles is correct.
+ if (bundleToDestBroker.size() != 2) {
+ String msg = format("Bundle %s should be split into 2 bundles, but
got %d bundles",
+ serviceUnit, bundleToDestBroker.size());
+ updateFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
+ return;
+ }
+ String namespace = bundle.getNamespaceObject().toString();
+
+ // 2. get the split boundary.
+ Set<Long> boundaries = new HashSet<>();
Review Comment:
This conversion looks redundant.
Set<Long> boundaries = new HashSet<>();
...
new ArrayList<>(boundaries)
Like mentioned above, can we pass `List<Long> boundaries` in
`ServiceUnitStateData`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]