heesung-sn commented on code in PR #19773:
URL: https://github.com/apache/pulsar/pull/19773#discussion_r1136042304


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -799,8 +801,29 @@ protected void 
splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
                                                 long startTime,
                                                 CompletableFuture<Void> 
completionFuture) {
         CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
+        Map<String, Optional<String>> bundleToDestBroker = 
data.splitServiceUnitToDestBroker();
+        // 1. verify the sub bundles is correct.
+        if (bundleToDestBroker.size() != 2) {

Review Comment:
   We should move this validation to the constructors if required. 
   
   ```
   public record Split(
           String serviceUnit, String sourceBroker, Map<String, 
Optional<String>> splitServiceUnitToDestBroker) {
   
       public Split {
           Objects.requireNonNull(serviceUnit);
       }
   }
   
   public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String 
sourceBroker,
                                   Map<String, Optional<String>> 
splitServiceUnitToDestBroker, long versionId) {
           this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker, 
false,
                   System.currentTimeMillis(), versionId);
       }
   ```
   
   Also, currently, split requests from the split scheduler do pass an empty 
splitServiceUnitToDestBroker (or null), expecting split boundaries to be 
auto-selected based on the configured algo.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -799,8 +801,29 @@ protected void 
splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
                                                 long startTime,
                                                 CompletableFuture<Void> 
completionFuture) {
         CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
+        Map<String, Optional<String>> bundleToDestBroker = 
data.splitServiceUnitToDestBroker();
+        // 1. verify the sub bundles is correct.
+        if (bundleToDestBroker.size() != 2) {
+            String msg = format("Bundle %s should be split into 2 bundles, but 
got %d bundles",
+                    serviceUnit, bundleToDestBroker.size());
+            updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+            return;
+        }
+        String namespace = bundle.getNamespaceObject().toString();
+
+        // 2. get the split boundary.
+        Set<Long> boundaries = new HashSet<>();
+        bundleToDestBroker.forEach((bundleRange, destBroker) -> {
+            NamespaceBundle subBundle = bundleFactory.getBundle(namespace, 
bundleRange);
+            boundaries.add(subBundle.getKeyRange().lowerEndpoint());
+            boundaries.add(subBundle.getKeyRange().upperEndpoint());
+        });
 
-        pulsar.getNamespaceService().getSplitBoundary(bundle, 
null).thenAccept(splitBundlesPair -> {
+        // 3. split the bundle use the specified boundary.
+        NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
+                NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+        pulsar.getNamespaceService().getSplitBoundary(bundle, 
nsBundleSplitAlgorithm, new ArrayList<>(boundaries))

Review Comment:
   Again, the split scheduler does not pass the boundaries. It seems that this 
change will break the split scheduler behavior.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -369,6 +378,53 @@ private CompletableFuture<Void> unloadAsync(Unload unload,
         return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, 
timeoutUnit);
     }
 
+    public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId 
bundle,
+                                                             
NamespaceBundleSplitAlgorithm splitAlgorithm,
+                                                             List<Long> 
boundaries) {
+        final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle.toString());
+        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle.toString());
+        NamespaceBundle namespaceBundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName,
 bundleRange);
+        return pulsar.getNamespaceService().getSplitBoundary(namespaceBundle, 
splitAlgorithm, boundaries)
+                .thenCompose(splitBundlesPair -> {
+                    if (splitBundlesPair == null) {
+                        String msg = format("Bundle %s not found under 
namespace", namespaceBundle);
+                        log.error(msg);
+                        return FutureUtil.failedFuture(new 
IllegalStateException(msg));
+                    }
+
+                    return getOwnershipAsync(Optional.empty(), bundle)
+                            .thenCompose(brokerOpt -> {
+                                if (brokerOpt.isEmpty()) {
+                                    String msg = String.format("Namespace 
bundle: %s is not owned by any broker.",
+                                            bundle);
+                                    log.warn(msg);
+                                    throw new IllegalStateException(msg);
+                                }
+                                String sourceBroker = brokerOpt.get();
+                                SplitDecision splitDecision = new 
SplitDecision();
+                                List<NamespaceBundle> splitBundles = 
splitBundlesPair.getRight();
+                                Map<String, Optional<String>> 
splitServiceUnitToDestBroker = new HashMap<>();
+                                splitBundles.forEach(splitBundle -> 
splitServiceUnitToDestBroker
+                                        .put(splitBundle.getBundleRange(), 
Optional.empty()));
+                                splitDecision.setSplit(
+                                        new Split(bundle.toString(), 
sourceBroker, splitServiceUnitToDestBroker));

Review Comment:
   can we just pass `List<Long> boundaries` to `new Split`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java:
##########
@@ -553,11 +552,24 @@ public void splitAndRetryTest() throws Exception {
             }
             // Call the real method
             reset(namespaceService);
+            doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", 
"test-topic-2")))
+                    
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
             return future;
         }).when(namespaceService).updateNamespaceBundles(any(), any());
         doReturn(namespaceService).when(pulsar1).getNamespaceService();
-
-        Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>());

Review Comment:
   Like commented above, the split scheduler passes an empty map, and this BSC 
test case validates if the split event completes with an empty map.
   
   Please provide another UT for the non-empty case(admin split case).



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java:
##########
@@ -27,7 +29,8 @@
  */
 
 public record ServiceUnitStateData(
-        ServiceUnitState state, String dstBroker, String sourceBroker, boolean 
force, long timestamp, long versionId) {
+        ServiceUnitState state, String dstBroker, String sourceBroker,
+        Map<String, Optional<String>> splitServiceUnitToDestBroker, boolean 
force, long timestamp, long versionId) {

Review Comment:
   1. Don't we need to keep the order of boundaries? Can it be a List?
   
   2. Can you explain why we have the dstBroker value? Can we remove the 
dstBroker? 
   
   3. Currently, I think we only require split boundaries. Can we make it 
`List<Long> splitBoundaries`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java:
##########
@@ -36,16 +39,22 @@ public record ServiceUnitStateData(
         }
     }
 
+    public ServiceUnitStateData(ServiceUnitState state, String dstBroker, 
String sourceBroker,
+                                Map<String, Optional<String>> 
splitServiceUnitToDestBroker, long versionId) {
+        this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker, 
false,
+                System.currentTimeMillis(), versionId);
+    }
+
     public ServiceUnitStateData(ServiceUnitState state, String dstBroker, 
String sourceBroker, long versionId) {
-        this(state, dstBroker, sourceBroker, false, 
System.currentTimeMillis(), versionId);
+        this(state, dstBroker, sourceBroker, Map.of(), false, 
System.currentTimeMillis(), versionId);

Review Comment:
   `Map.of` seems redundant. Can it be null? (applies the same for the below 
constructors)



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java:
##########
@@ -28,7 +28,7 @@
 /**
  * This algorithm divides the bundle into several parts by the specified 
positions.
  */
-public class SpecifiedPositionsBundleSplitAlgorithm implements 
NamespaceBundleSplitAlgorithm{

Review Comment:
   Seems like there is only whitespace change in this class. Can you revert 
this part?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -799,8 +801,29 @@ protected void 
splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
                                                 long startTime,
                                                 CompletableFuture<Void> 
completionFuture) {
         CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
+        Map<String, Optional<String>> bundleToDestBroker = 
data.splitServiceUnitToDestBroker();
+        // 1. verify the sub bundles is correct.
+        if (bundleToDestBroker.size() != 2) {
+            String msg = format("Bundle %s should be split into 2 bundles, but 
got %d bundles",
+                    serviceUnit, bundleToDestBroker.size());
+            updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+            return;
+        }
+        String namespace = bundle.getNamespaceObject().toString();
+
+        // 2. get the split boundary.
+        Set<Long> boundaries = new HashSet<>();

Review Comment:
   This conversion looks redundant. 
   Set<Long> boundaries = new HashSet<>();
   ...
   new ArrayList<>(boundaries)
   
   Like mentioned above, can we pass `List<Long> boundaries` in 
`ServiceUnitStateData`?
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to