This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 421d707419e [improve][broker] PIP-192 made split handler idempotent 
(#19988)
421d707419e is described below

commit 421d707419ee42378b9c5b37812bd5c617ef2c9a
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Apr 8 03:49:16 2023 -0700

    [improve][broker] PIP-192 made split handler idempotent (#19988)
    
    Master Issue: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    
    Raising a PR to implement: https://github.com/apache/pulsar/issues/16691
    
    ### Modifications
    This PR
    - Makes split handler idempotent .
    - Makes Leader's orphan monitor keep trying to send split msg until 
finished.
    - Select bundle boundaries at the SplitScheduler to have the same split 
boundaries for each Split handler retry.
    - Adds a split condition to check if the parent's Splitting state has moved.
    - Made Admin Unload command forceful to unload any bundles in invalid 
states.
---
 conf/broker.conf                                   |   2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   2 +-
 .../extensions/ExtensibleLoadManagerImpl.java      |   2 +-
 .../channel/ServiceUnitStateChannelImpl.java       | 263 +++++++++-----
 .../extensions/channel/ServiceUnitStateData.java   |  15 +
 .../loadbalance/extensions/models/Split.java       |   6 +-
 .../loadbalance/extensions/models/Unload.java      |  10 +-
 .../DefaultNamespaceBundleSplitStrategyImpl.java   |  76 +++-
 .../pulsar/broker/namespace/NamespaceService.java  |  10 +-
 .../common/naming/NamespaceBundleFactory.java      |   8 +-
 .../naming/NamespaceBundleSplitAlgorithm.java      |   3 +
 .../SpecifiedPositionsBundleSplitAlgorithm.java    |  47 ++-
 .../extensions/ExtensibleLoadManagerImplTest.java  |   1 +
 .../channel/ServiceUnitStateChannelTest.java       | 399 ++++++++++++++++++---
 .../extensions/channel/models/SplitTest.java       |   8 +-
 .../extensions/scheduler/SplitSchedulerTest.java   |  19 +-
 .../DefaultNamespaceBundleSplitStrategyTest.java   |  81 ++++-
 ...SpecifiedPositionsBundleSplitAlgorithmTest.java |  15 +
 18 files changed, 767 insertions(+), 200 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1bbb12313f9..414756abcb7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1455,7 +1455,7 @@ 
loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
 # the service-unit state channel when there are a large number of bundles.
 # minimum value = 30 secs
 # (only used in load balancer extension logics)
-loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds=604800
+loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600
 
 
 ### --- Replication --- ###
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 52cebe15f6a..c86358b9b5b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2609,7 +2609,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + "minimum value = 30 secs"
                     + "(only used in load balancer extension logics)"
     )
-    private long loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds = 
604800;
+    private long loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds = 
3600;
 
     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index c1234b6dab2..e054a1bc662 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -449,7 +449,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                         log.warn(msg);
                         throw new IllegalArgumentException(msg);
                     }
-                    Unload unload = new Unload(sourceBroker, 
bundle.toString(), destinationBroker);
+                    Unload unload = new Unload(sourceBroker, 
bundle.toString(), destinationBroker, true);
                     UnloadDecision unloadDecision =
                             new UnloadDecision(unload, 
UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
                     return unloadAsync(unloadDecision,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ec79698db1f..bd62c53be60 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -46,7 +46,6 @@ import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -61,6 +60,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -200,7 +200,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 CompletableFuture<String>>newBuilder().build();
         this.cleanupJobs = ConcurrentOpenHashMap.<String, 
CompletableFuture<Void>>newBuilder().build();
         this.stateChangeListeners = new StateChangeListeners();
-        this.semiTerminalStateWaitingTimeInMillis = 
config.getLoadBalancerServiceUnitStateCleanUpDelayTimeInSeconds()
+        this.semiTerminalStateWaitingTimeInMillis = 
config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
                 * 1000;
         this.inFlightStateWaitingTimeInMillis = 
MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
         this.ownershipMonitorDelayTimeInSecs = 
OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
@@ -563,10 +563,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         ServiceUnitStateData next;
         if (isTransferCommand(unload)) {
             next = new ServiceUnitStateData(
-                    Releasing, unload.destBroker().get(), 
unload.sourceBroker(), getNextVersionId(serviceUnit));
+                    Releasing, unload.destBroker().get(), 
unload.sourceBroker(),
+                    unload.force(), getNextVersionId(serviceUnit));
         } else {
             next = new ServiceUnitStateData(
-                    Releasing, null, unload.sourceBroker(), 
getNextVersionId(serviceUnit));
+                    Releasing, null, unload.sourceBroker(), unload.force(), 
getNextVersionId(serviceUnit));
         }
         return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
             if (ex != null) {
@@ -660,7 +661,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 long handlerTotalCount = getHandlerTotalCounter(data).get();
                 long handlerFailureCount = 
getHandlerFailureCounter(data).get();
                 log.info("{} handled {} event for serviceUnit:{}, cur:{}, 
next:{}, "
-                                + "totalHandledRequests{}, 
totalFailedRequests:{}",
+                                + "totalHandledRequests:{}, 
totalFailedRequests:{}",
                         lookupServiceAddress, getLogEventTag(data), 
serviceUnit,
                         data == null ? "" : data,
                         next == null ? "" : next,
@@ -671,7 +672,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             long handlerTotalCount = getHandlerTotalCounter(data).get();
             long handlerFailureCount = 
getHandlerFailureCounter(data).incrementAndGet();
             log.error("{} failed to handle {} event for serviceUnit:{}, 
cur:{}, next:{}, "
-                            + "totalHandledRequests{}, totalFailedRequests:{}",
+                            + "totalHandledRequests:{}, 
totalFailedRequests:{}",
                     lookupServiceAddress, getLogEventTag(data), serviceUnit,
                     data == null ? "" : data,
                     next == null ? "" : next,
@@ -857,110 +858,158 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 boundariesSet.add(subBundle.getKeyRange().upperEndpoint());
             });
             boundaries = new ArrayList<>(boundariesSet);
-            nsBundleSplitAlgorithm = 
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+            nsBundleSplitAlgorithm = 
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
         }
         final AtomicInteger counter = new AtomicInteger(0);
+        var childBundles = 
data.splitServiceUnitToDestBroker().keySet().stream()
+                .map(child -> bundleFactory.getBundle(
+                        bundle.getNamespaceObject().toString(), child))
+                .collect(Collectors.toList());
         this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, 
nsBundleSplitAlgorithm,
-                bundle, boundaries, serviceUnit, data, counter, startTime, 
completionFuture);
+                bundle, childBundles, boundaries, data, counter, startTime, 
completionFuture);
         return completionFuture;
     }
 
+
+
     @VisibleForTesting
     protected void splitServiceUnitOnceAndRetry(NamespaceService 
namespaceService,
                                                 NamespaceBundleFactory 
bundleFactory,
                                                 NamespaceBundleSplitAlgorithm 
algorithm,
-                                                NamespaceBundle bundle,
+                                                NamespaceBundle parentBundle,
+                                                List<NamespaceBundle> 
childBundles,
                                                 List<Long> boundaries,
-                                                String serviceUnit,
-                                                ServiceUnitStateData data,
+                                                ServiceUnitStateData 
parentData,
                                                 AtomicInteger counter,
                                                 long startTime,
                                                 CompletableFuture<Void> 
completionFuture) {
-        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
-
-        namespaceService.getSplitBoundary(bundle, algorithm, boundaries)
-                .thenAccept(splitBundlesPair -> {
-            // Split and updateNamespaceBundles. Update may fail because of 
concurrent write to Zookeeper.
-            if (splitBundlesPair == null) {
-                String msg = format("Bundle %s not found under namespace", 
serviceUnit);
-                updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
-                return;
-            }
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.sourceBroker(), VERSION_ID_INIT);
-            NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
-            List<NamespaceBundle> splitBundles = 
Collections.unmodifiableList(splitBundlesPair.getRight());
-            List<NamespaceBundle> successPublishedBundles =
-                    Collections.synchronizedList(new 
ArrayList<>(splitBundles.size()));
-            List<CompletableFuture<Void>> futures = new 
ArrayList<>(splitBundles.size());
-            for (NamespaceBundle sBundle : splitBundles) {
-                futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ 
-> successPublishedBundles.add(sBundle)));
-            }
-            NamespaceName nsname = bundle.getNamespaceObject();
-            FutureUtil.waitForAll(futures)
-                    .thenCompose(__ -> 
namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
-                    .thenCompose(__ -> 
namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
-                    .thenRun(() -> {
-                        
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
-                        updateFuture.complete(splitBundles);
-                    }).exceptionally(e -> {
-                        // Clean the new bundle when has exception.
-                        List<CompletableFuture<Void>> futureList = new 
ArrayList<>();
-                        for (NamespaceBundle sBundle : 
successPublishedBundles) {
-                            
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
-                        }
-                        FutureUtil.waitForAll(futureList)
-                                .whenComplete((__, ex) -> {
-                                    if (ex != null) {
-                                        log.warn("Clean new bundles failed,", 
ex);
-                                    }
-                                    updateFuture.completeExceptionally(e);
-                                });
-                        return null;
-                    });
-        }).exceptionally(e -> {
-            updateFuture.completeExceptionally(e);
-            return null;
-        });
+        ownChildBundles(childBundles, parentData)
+                .thenCompose(__ -> getSplitNamespaceBundles(
+                        namespaceService, bundleFactory, algorithm, 
parentBundle, childBundles, boundaries))
+                .thenCompose(namespaceBundles -> 
updateSplitNamespaceBundlesAsync(
+                        namespaceService, bundleFactory, parentBundle, 
namespaceBundles))
+                .thenAccept(__ -> // Update bundled_topic cache for 
load-report-generation
+                        
pulsar.getBrokerService().refreshTopicToStatsMaps(parentBundle))
+                .thenAccept(__ -> pubAsync(parentBundle.toString(), new 
ServiceUnitStateData(
+                                Deleted, null, parentData.sourceBroker(), 
getNextVersionId(parentData))))
+                .thenAccept(__ -> {
+                    double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
+                    log.info("Successfully split {} parent namespace-bundle to 
{} in {} ms",
+                            parentBundle, childBundles, splitBundleTime);
+                    completionFuture.complete(null);
+                })
+                .exceptionally(ex -> {
+                    // Retry several times on BadVersion
+                    Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                    if ((throwable instanceof 
MetadataStoreException.BadVersionException)
+                            && (counter.incrementAndGet() < 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
+                        log.warn("Failed to update bundle range in metadata 
store. Retrying {} th / {} limit",
+                                counter.get(), 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, ex);
+                        pulsar.getExecutor().schedule(() -> 
splitServiceUnitOnceAndRetry(
+                                namespaceService, bundleFactory, algorithm, 
parentBundle, childBundles,
+                                        boundaries, parentData, counter, 
startTime, completionFuture),
+                                100, MILLISECONDS);
+                    } else {
+                        // Retry enough, or meet other exception
+                        String msg = format("Failed to split bundle %s, 
Retried %d th / %d limit, reason %s",
+                                parentBundle.toString(), counter.get(),
+                                NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, 
throwable.getMessage());
+                        log.warn(msg, throwable);
+                        completionFuture.completeExceptionally(
+                                new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+                    }
+                    return null;
+                });
+    }
 
-        updateFuture.thenAccept(r -> {
-            // Delete the old bundle
-            pubAsync(serviceUnit, new ServiceUnitStateData(
-                    Deleted, null, data.sourceBroker(), 
getNextVersionId(data)))
-                    .thenRun(() -> {
-                // Update bundled_topic cache for load-report-generation
-                pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
-                // TODO: Update the load data immediately if needed.
-                completionFuture.complete(null);
-                double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
-                log.info("Successfully split {} parent namespace-bundle to {} 
in {} ms", serviceUnit, r,
-                        splitBundleTime);
-            }).exceptionally(e -> {
-                double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
-                String msg = format("Failed to free bundle %s in %s ms, under 
namespace [%s] with error %s",
-                        bundle.getNamespaceObject().toString(), 
splitBundleTime, bundle, e.getMessage());
-                completionFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
-                return null;
-            });
-        }).exceptionally(ex -> {
-            // Retry several times on BadVersion
-            Throwable throwable = FutureUtil.unwrapCompletionException(ex);
-            if ((throwable instanceof 
MetadataStoreException.BadVersionException)
-                    && (counter.incrementAndGet() < 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
-                log.warn("Failed to update bundle range in metadata store. 
Retrying {} th / {} limit",
-                        counter.get(), 
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, ex);
-                pulsar.getExecutor().schedule(() -> 
splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
-                        algorithm, bundle, boundaries, serviceUnit, data, 
counter, startTime, completionFuture),
-                        100, MILLISECONDS);
-            } else if (throwable instanceof IllegalArgumentException) {
-                completionFuture.completeExceptionally(throwable);
+    private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle> 
childBundles,
+                                                    ServiceUnitStateData 
parentData) {
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(childBundles.size());
+        var debug = debug();
+        for (var childBundle : childBundles) {
+            var childBundleStr = childBundle.toString();
+            var childData = tableview.get(childBundleStr);
+            if (childData != null) {
+                if (debug) {
+                    log.info("Already owned child bundle:{}", childBundleStr);
+                }
             } else {
-                // Retry enough, or meet other exception
-                String msg = format("Bundle: %s not success update nsBundles, 
counter %d, reason %s",
-                        bundle.toString(), counter.get(), 
throwable.getMessage());
-                completionFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+                childData = new ServiceUnitStateData(Owned, 
parentData.sourceBroker(),
+                        VERSION_ID_INIT);
+                futures.add(pubAsync(childBundleStr, childData).thenApply(__ 
-> null));
             }
-            return null;
-        });
+        }
+
+        if (!futures.isEmpty()) {
+            return FutureUtil.waitForAll(futures);
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private CompletableFuture<NamespaceBundles> 
getSplitNamespaceBundles(NamespaceService namespaceService,
+                                                                         
NamespaceBundleFactory bundleFactory,
+                                                                         
NamespaceBundleSplitAlgorithm algorithm,
+                                                                         
NamespaceBundle parentBundle,
+                                                                         
List<NamespaceBundle> childBundles,
+                                                                         
List<Long> boundaries) {
+        CompletableFuture future = new CompletableFuture();
+        final var debug = debug();
+        var targetNsBundle = 
bundleFactory.getBundles(parentBundle.getNamespaceObject());
+        boolean found = false;
+        try {
+            targetNsBundle.validateBundle(parentBundle);
+        } catch (IllegalArgumentException e) {
+            if (debug) {
+                log.info("Namespace bundles do not contain the parent 
bundle:{}",
+                        parentBundle);
+            }
+            for (var childBundle : childBundles) {
+                try {
+                    targetNsBundle.validateBundle(childBundle);
+                    if (debug) {
+                        log.info("Namespace bundles contain the child 
bundle:{}",
+                                childBundle);
+                    }
+                } catch (Exception ex) {
+                    future.completeExceptionally(
+                            new 
BrokerServiceException.ServiceUnitNotReadyException(
+                                    "Namespace bundles do not contain the 
child bundle:" + childBundle, e));
+                    return future;
+                }
+            }
+            found = true;
+        } catch (Exception e) {
+            future.completeExceptionally(
+                    new BrokerServiceException.ServiceUnitNotReadyException(
+                            "Failed to validate the parent bundle in the 
namespace bundles.", e));
+            return future;
+        }
+        if (found) {
+            future.complete(targetNsBundle);
+            return future;
+        } else {
+            return namespaceService.getSplitBoundary(parentBundle, algorithm, 
boundaries)
+                    .thenApply(splitBundlesPair -> splitBundlesPair.getLeft());
+        }
+    }
+
+    private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(
+            NamespaceService namespaceService,
+            NamespaceBundleFactory bundleFactory,
+            NamespaceBundle parentBundle,
+            NamespaceBundles splitNamespaceBundles) {
+        var namespaceName = parentBundle.getNamespaceObject();
+        return namespaceService.updateNamespaceBundles(
+                        namespaceName, splitNamespaceBundles)
+                .thenCompose(__ -> 
namespaceService.updateNamespaceBundlesForPolicies(
+                        namespaceName, splitNamespaceBundles))
+                .thenAccept(__ -> {
+                    
bundleFactory.invalidateBundleCache(parentBundle.getNamespaceObject());
+                    if (debug()) {
+                        log.info("Successfully updated split namespace bundles 
and namespace bundle cache.");
+                    }
+                });
     }
 
     public void handleMetadataSessionEvent(SessionEvent e) {
@@ -1053,11 +1102,23 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 broker, delayInSecs, cleanupJobs.size());
     }
 
+
+    private ServiceUnitStateData 
getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
+                                                                    String 
selectedBroker) {
+        if (orphanData.state() == Splitting) {
+            return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), 
selectedBroker,
+                    Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
+                    true, getNextVersionId(orphanData));
+        } else {
+            return new ServiceUnitStateData(Owned, selectedBroker, true, 
getNextVersionId(orphanData));
+        }
+    }
+
     private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData) {
+
         Optional<String> selectedBroker = selectBroker(serviceUnit);
         if (selectedBroker.isPresent()) {
-            var override =
-                    new ServiceUnitStateData(Owned, selectedBroker.get(), 
true, getNextVersionId(orphanData));
+            var override = getOverrideInactiveBrokerStateData(orphanData, 
selectedBroker.get());
             log.info("Overriding ownership serviceUnit:{} from orphanData:{} 
to overrideData:{}",
                     serviceUnit, orphanData, override);
             publishOverrideEventAsync(serviceUnit, orphanData, override)
@@ -1157,7 +1218,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             case Assigning: {
                 return getRollForwardStateData(serviceUnit, nextVersionId);
             }
-            case Splitting, Releasing: {
+            case Splitting: {
+                return Optional.of(new ServiceUnitStateData(Splitting,
+                        orphanData.dstBroker(), orphanData.sourceBroker(),
+                        Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
+                        true, nextVersionId));
+            }
+            case Releasing: {
                 if (availableBrokers.contains(orphanData.sourceBroker())) {
                     // rollback to the src
                     return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true, nextVersionId));
@@ -1219,7 +1286,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     inactiveBrokers.add(dstBroker);
                 } else if (isInFlightState(state)
                         && now - stateData.timestamp() > 
inFlightStateWaitingTimeInMillis) {
-                    log.warn("Found orphan serviceUnit:{}, stateData:{}", 
serviceUnit, stateData);
                     orphanServiceUnits.put(serviceUnit, stateData);
                 }
             } else if (now - stateData.timestamp() > 
semiTerminalStateWaitingTimeInMillis) {
@@ -1249,6 +1315,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 var overrideData = getOverrideInFlightStateData(
                         orphanServiceUnit, orphanData, activeBrokers);
                 if (overrideData.isPresent()) {
+                    log.info("Overriding in-flight state ownership 
serviceUnit:{} "
+                                    + "from orphanData:{} to overrideData:{}",
+                            orphanServiceUnit, orphanData, overrideData);
                     publishOverrideEventAsync(orphanServiceUnit, orphanData, 
overrideData.get())
                             .whenComplete((__, e) -> {
                                 if (e != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index 52c7e27d650..307d3a4acb1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -45,10 +45,25 @@ public record ServiceUnitStateData(
                 System.currentTimeMillis(), versionId);
     }
 
+    public ServiceUnitStateData(ServiceUnitState state, String dstBroker, 
String sourceBroker,
+                                Map<String, Optional<String>> 
splitServiceUnitToDestBroker, boolean force,
+                                long versionId) {
+        this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker, 
force,
+                System.currentTimeMillis(), versionId);
+    }
+
     public ServiceUnitStateData(ServiceUnitState state, String dstBroker, 
String sourceBroker, long versionId) {
         this(state, dstBroker, sourceBroker, null, false, 
System.currentTimeMillis(), versionId);
     }
 
+    public ServiceUnitStateData(ServiceUnitState state, String dstBroker, 
String sourceBroker, boolean force,
+                                long versionId) {
+        this(state, dstBroker, sourceBroker, null, force,
+                System.currentTimeMillis(), versionId);
+    }
+
+
+
     public ServiceUnitStateData(ServiceUnitState state, String dstBroker, long 
versionId) {
         this(state, dstBroker, null, null, false, System.currentTimeMillis(), 
versionId);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
index ac9a36e6dbf..690fac59bc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java
@@ -30,12 +30,8 @@ public record Split(
 
     public Split {
         Objects.requireNonNull(serviceUnit);
-        if (splitServiceUnitToDestBroker != null && 
splitServiceUnitToDestBroker.size() != 2) {
+        if (splitServiceUnitToDestBroker == null || 
splitServiceUnitToDestBroker.size() != 2) {
             throw new IllegalArgumentException("Split service unit should be 
split into 2 service units.");
         }
     }
-
-    public Split(String serviceUnit, String sourceBroker) {
-        this(serviceUnit, sourceBroker, null);
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
index d474011919d..753f8a942ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java
@@ -24,12 +24,18 @@ import java.util.Optional;
 /**
  * Defines the information required to unload or transfer a service unit(e.g. 
bundle).
  */
-public record Unload(String sourceBroker, String serviceUnit, Optional<String> 
destBroker) {
+public record Unload(String sourceBroker, String serviceUnit, Optional<String> 
destBroker, boolean force) {
     public Unload {
         Objects.requireNonNull(sourceBroker);
         Objects.requireNonNull(serviceUnit);
     }
+
+
     public Unload(String sourceBroker, String serviceUnit) {
-        this(sourceBroker, serviceUnit, Optional.empty());
+        this(sourceBroker, serviceUnit, Optional.empty(), false);
+    }
+
+    public Unload(String sourceBroker, String serviceUnit, Optional<String> 
destBroker) {
+        this(sourceBroker, serviceUnit, destBroker, false);
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
index bd8b3aa6654..7875c07b122 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -26,18 +26,23 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecisi
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 
@@ -53,12 +58,14 @@ public class DefaultNamespaceBundleSplitStrategyImpl 
implements NamespaceBundleS
     private final Set<SplitDecision> decisionCache;
     private final Map<String, Integer> namespaceBundleCount;
     private final Map<String, Integer> splitConditionHitCounts;
+    private final Map<String, String> splittingBundles;
     private final SplitCounter counter;
 
     public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
         decisionCache = new HashSet<>();
         namespaceBundleCount = new HashMap<>();
         splitConditionHitCounts = new HashMap<>();
+        splittingBundles = new HashMap<>();
         this.counter = counter;
 
     }
@@ -67,6 +74,7 @@ public class DefaultNamespaceBundleSplitStrategyImpl 
implements NamespaceBundleS
     public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, 
PulsarService pulsar) {
         decisionCache.clear();
         namespaceBundleCount.clear();
+        splittingBundles.clear();
         final ServiceConfiguration conf = pulsar.getConfiguration();
         int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
         long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
@@ -78,6 +86,15 @@ public class DefaultNamespaceBundleSplitStrategyImpl 
implements NamespaceBundleS
         boolean debug = log.isDebugEnabled() || 
conf.isLoadBalancerDebugModeEnabled();
         var channel = ServiceUnitStateChannelImpl.get(pulsar);
 
+        for (var etr : channel.getOwnershipEntrySet()) {
+            var eData = etr.getValue();
+            if (eData.state() == ServiceUnitState.Splitting) {
+                String bundle = etr.getKey();
+                final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                splittingBundles.put(bundle, bundleRange);
+            }
+        }
+
         Map<String, NamespaceBundleStats> bundleStatsMap = 
pulsar.getBrokerService().getBundleStats();
         NamespaceBundleFactory namespaceBundleFactory =
                 pulsar.getNamespaceService().getNamespaceBundleFactory();
@@ -177,6 +194,27 @@ public class DefaultNamespaceBundleSplitStrategyImpl 
implements NamespaceBundleS
                 continue;
             }
 
+            var ranges = bundleRange.split("_");
+            var foundSplittingBundle = false;
+            for (var etr : splittingBundles.entrySet()) {
+                var splittingBundle = etr.getKey();
+                if (splittingBundle.startsWith(namespace)) {
+                    var splittingBundleRange = etr.getValue();
+                    if (splittingBundleRange.startsWith(ranges[0])
+                            || splittingBundleRange.endsWith(ranges[1])) {
+                        if (debug) {
+                            log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
+                                    + " (parent) bundle:%s is in Splitting 
state.", bundle, splittingBundle));
+                        }
+                        foundSplittingBundle = true;
+                        break;
+                    }
+                }
+            }
+            if (foundSplittingBundle) {
+                continue;
+            }
+
             if (debug) {
                 log.info(String.format(
                         "Splitting bundle: %s. "
@@ -193,7 +231,43 @@ public class DefaultNamespaceBundleSplitStrategyImpl 
implements NamespaceBundleS
                 ));
             }
             var decision = new SplitDecision();
-            decision.setSplit(new Split(bundle, 
context.brokerRegistry().getBrokerId()));
+            var namespaceService = pulsar.getNamespaceService();
+            var namespaceBundle = namespaceService.getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            NamespaceBundleSplitAlgorithm algorithm =
+                    namespaceService.getNamespaceBundleSplitAlgorithmByName(
+                            conf.getDefaultNamespaceBundleSplitAlgorithm());
+            List<Long> splitBoundary = null;
+            try {
+                splitBoundary = namespaceService
+                        .getSplitBoundary(namespaceBundle,  null, algorithm)
+                        .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            } catch (Throwable e) {
+                counter.update(Failure, Unknown);
+                log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to 
get split boundaries.", bundle, e));
+                continue;
+            }
+            if (splitBoundary == null) {
+                counter.update(Failure, Unknown);
+                log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The split 
boundaries is null.", bundle));
+                continue;
+            }
+            if (splitBoundary.size() != 1) {
+                counter.update(Failure, Unknown);
+                log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The size of 
split boundaries is not 1. "
+                        + "splitBoundary:%s", bundle, splitBoundary));
+                continue;
+            }
+
+            var parentRange = namespaceBundle.getKeyRange();
+            var leftChildBundle = 
namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
+                    
NamespaceBundleFactory.getRange(parentRange.lowerEndpoint(), 
splitBoundary.get(0)));
+            var rightChildBundle = 
namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
+                    NamespaceBundleFactory.getRange(splitBoundary.get(0), 
parentRange.upperEndpoint()));
+            Map<String, Optional<String>> splitServiceUnitToDestBroker = 
Map.of(
+                    leftChildBundle.getBundleRange(), Optional.empty(),
+                    rightChildBundle.getBundleRange(), Optional.empty());
+            decision.setSplit(new Split(bundle, 
context.brokerRegistry().getBrokerId(), splitServiceUnitToDestBroker));
             decision.succeed(reason);
             decisionCache.add(decision);
             int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d092ef04018..c9cd33d6d87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -967,9 +967,7 @@ public class NamespaceService implements AutoCloseable {
      */
     public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> 
getSplitBoundary(
             NamespaceBundle bundle, NamespaceBundleSplitAlgorithm 
nsBundleSplitAlgorithm, List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, 
boundaries, config);
-        CompletableFuture<List<Long>> splitBoundary =
-                nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
+        CompletableFuture<List<Long>> splitBoundary = getSplitBoundary(bundle, 
boundaries, nsBundleSplitAlgorithm);
         return splitBoundary.thenCompose(splitBoundaries -> {
                     if (splitBoundaries == null || splitBoundaries.size() == 
0) {
                         LOG.info("[{}] No valid boundary found in {} to split 
bundle {}",
@@ -981,6 +979,12 @@ public class NamespaceService implements AutoCloseable {
                 });
     }
 
+    public CompletableFuture<List<Long>> getSplitBoundary(
+            NamespaceBundle bundle, List<Long> boundaries, 
NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
+        BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, 
boundaries, config);
+        return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
+    }
+
     private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
                                                    List<Long> boundaries,
                                                    ServiceConfiguration 
config) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index f0f7deb940d..937d2763767 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -272,8 +272,7 @@ public class NamespaceBundleFactory {
         String[] boundaries = bundleRange.split("_");
         Long lowerEndpoint = Long.decode(boundaries[0]);
         Long upperEndpoint = Long.decode(boundaries[1]);
-        Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, 
upperEndpoint,
-                (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? 
BoundType.CLOSED : BoundType.OPEN);
+        Range<Long> hashRange = getRange(lowerEndpoint, upperEndpoint);
         return getBundle(NamespaceName.get(namespace), hashRange);
     }
 
@@ -414,4 +413,9 @@ public class NamespaceBundleFactory {
         return Joiner.on("/").join(i);
     }
 
+    public static Range<Long> getRange(Long lowerEndpoint, Long upperEndpoint) 
{
+        return Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
+                (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? 
BoundType.CLOSED : BoundType.OPEN);
+    }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
index 01a61d81665..1fd6fbcd6ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
@@ -39,6 +39,9 @@ public interface NamespaceBundleSplitAlgorithm {
     NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new 
TopicCountEquallyDivideBundleSplitAlgorithm();
     NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO =
             new SpecifiedPositionsBundleSplitAlgorithm();
+
+    NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO =
+            new SpecifiedPositionsBundleSplitAlgorithm(true);
     NamespaceBundleSplitAlgorithm FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO = new 
FlowOrQpsEquallyDivideBundleSplitAlgorithm();
 
     static NamespaceBundleSplitAlgorithm of(String algorithmName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
index 1fd9a85cfa7..4a5f8a831c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
@@ -29,6 +29,16 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
  * This algorithm divides the bundle into several parts by the specified 
positions.
  */
 public class SpecifiedPositionsBundleSplitAlgorithm implements 
NamespaceBundleSplitAlgorithm{
+
+    private boolean force;
+
+    public SpecifiedPositionsBundleSplitAlgorithm() {
+        force = false;
+    }
+
+    public SpecifiedPositionsBundleSplitAlgorithm(boolean force) {
+        this.force = force;
+    }
     @Override
     public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption 
bundleSplitOption) {
         NamespaceService service = bundleSplitOption.getService();
@@ -39,19 +49,28 @@ public class SpecifiedPositionsBundleSplitAlgorithm 
implements NamespaceBundleSp
         }
         // sort all positions
         Collections.sort(positions);
-        return 
service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
-            if (topics == null || topics.size() <= 1) {
-                return CompletableFuture.completedFuture(null);
-            }
-            List<Long> splitBoundaries = positions
-                    .stream()
-                    .filter(position -> position > bundle.getLowerEndpoint() 
&& position < bundle.getUpperEndpoint())
-                    .collect(Collectors.toList());
-
-            if (splitBoundaries.size() == 0) {
-                return CompletableFuture.completedFuture(null);
-            }
-            return CompletableFuture.completedFuture(splitBoundaries);
-        });
+        if (force) {
+            return getBoundaries(bundle, positions);
+        } else {
+            return service.getOwnedTopicListForNamespaceBundle(bundle)
+                    .thenCompose(topics -> {
+                        if (topics == null || topics.size() <= 1) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                        return getBoundaries(bundle, positions);
+                    });
+        }
+    }
+
+    private CompletableFuture<List<Long>> getBoundaries(NamespaceBundle 
bundle, List<Long> positions) {
+        List<Long> splitBoundaries = positions
+                .stream()
+                .filter(position -> position > bundle.getLowerEndpoint() && 
position < bundle.getUpperEndpoint())
+                .collect(Collectors.toList());
+
+        if (splitBoundaries.size() == 0) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return CompletableFuture.completedFuture(splitBoundaries);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 57590d8d7da..ae7ceeed928 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -133,6 +133,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
         conf.setLoadBalancerSheddingEnabled(false);
+        conf.setLoadBalancerDebugModeEnabled(true);
         super.internalSetup(conf);
         pulsar1 = pulsar;
         ServiceConfiguration defaultConf = getDefaultConf();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 2d98bc5cae6..77c80187a63 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -107,9 +107,16 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     private String lookupServiceAddress1;
     private String lookupServiceAddress2;
     private String bundle;
-
     private String bundle1;
     private String bundle2;
+    private String bundle3;
+    private String childBundle1Range;
+    private String childBundle2Range;
+    private String childBundle11;
+    private String childBundle12;
+
+    private String childBundle31;
+    private String childBundle32;
     private PulsarTestContext additionalPulsarTestContext;
 
     private LoadManagerContext loadManagerContext;
@@ -122,6 +129,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     @Override
     protected void setup() throws Exception {
         conf.setAllowAutoTopicCreation(true);
+        conf.setLoadBalancerDebugModeEnabled(true);
         conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
         super.internalSetup(conf);
 
@@ -149,15 +157,23 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         lookupServiceAddress2 = (String)
                 FieldUtils.readDeclaredField(channel2, "lookupServiceAddress", 
true);
 
-        bundle = String.format("%s/%s", "public/default", 
"0x00000000_0xffffffff");
-        bundle1 = String.format("%s/%s", "public/default", 
"0x00000000_0xfffffff0");
-        bundle2 = String.format("%s/%s", "public/default", 
"0xfffffff0_0xffffffff");
+        bundle = "public/default/0x00000000_0xffffffff";
+        bundle1 = "public/default/0x00000000_0xfffffff0";
+        bundle2 = "public/default/0xfffffff0_0xffffffff";
+        bundle3 = "public/default3/0x00000000_0xffffffff";
+        childBundle1Range = "0x7fffffff_0xffffffff";
+        childBundle2Range = "0x00000000_0x7fffffff";
+
+        childBundle11 = "public/default/" + childBundle1Range;
+        childBundle12 = "public/default/" + childBundle2Range;
+
+        childBundle31 = "public/default3/" + childBundle1Range;
+        childBundle32 = "public/default3/" + childBundle2Range;
     }
 
     @BeforeMethod
     protected void initChannels() throws Exception {
-        cleanTableView(channel1, bundle);
-        cleanTableView(channel2, bundle);
+        cleanTableViews();
         cleanOwnershipMonitorCounters(channel1);
         cleanOwnershipMonitorCounters(channel2);
         cleanOpsCounters(channel1);
@@ -299,7 +315,9 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
             }
         }
         try {
-            channel.publishSplitEventAsync(new Split(bundle, 
lookupServiceAddress1))
+            Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+                    childBundle1Range, Optional.empty(), childBundle2Range, 
Optional.empty()));
+            channel.publishSplitEventAsync(split)
                     .get(2, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             if (e.getCause() instanceof IllegalStateException) {
@@ -496,7 +514,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(0, getOwnerRequests2.size());
 
         // recovered, check the monitor update state : Assigned -> Owned
-        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))).when(loadManager).selectAsync(any());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+                .when(loadManager).selectAsync(any());
         FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 1 , true);
@@ -564,32 +583,33 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", 
"test-topic-2")))
                 
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
 
-        Split split = new Split(bundle, ownerAddr1.get());
+        // Assert child bundle ownerships in the channels.
+
+        Split split = new Split(bundle, ownerAddr1.get(), Map.of(
+                childBundle1Range, Optional.empty(), childBundle2Range, 
Optional.empty()));
         channel1.publishSplitEventAsync(split);
 
         waitUntilState(channel1, bundle, Deleted);
         waitUntilState(channel2, bundle, Deleted);
 
-        validateHandlerCounters(channel1, 1, 0, 9, 0, 0, 0, 1, 0, 0, 0, 6, 0, 
1, 0);
-        validateHandlerCounters(channel2, 1, 0, 9, 0, 0, 0, 1, 0, 0, 0, 6, 0, 
1, 0);
+        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 
1, 0);
+        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 
1, 0);
         validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
         validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
         // Verify the retry count
         verify(((ServiceUnitStateChannelImpl) channel1), 
times(badVersionExceptionCount + 1))
                 .splitServiceUnitOnceAndRetry(any(), any(), any(), any(), 
any(), any(), any(), any(), anyLong(), any());
 
-        // Assert child bundle ownerships in the channels.
-        String childBundle1 = "public/default/0x7fffffff_0xffffffff";
-        String childBundle2 = "public/default/0x00000000_0x7fffffff";
 
-        waitUntilNewOwner(channel1, childBundle1, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1);
-        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle1).get());
-        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle2).get());
-        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle1).get());
-        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle2).get());
+
+        waitUntilNewOwner(channel1, childBundle11, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, childBundle12, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress1);
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle11).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle12).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle11).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle12).get());
 
 
         // try the monitor and check the monitor moves `Deleted` -> `Init`
@@ -620,10 +640,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 0,
                 0);
 
-        cleanTableView(channel1, childBundle1);
-        cleanTableView(channel2, childBundle1);
-        cleanTableView(channel1, childBundle2);
-        cleanTableView(channel2, childBundle2);
+        cleanTableView(channel1, childBundle11);
+        cleanTableView(channel2, childBundle11);
+        cleanTableView(channel1, childBundle12);
+        cleanTableView(channel2, childBundle12);
 
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -714,8 +734,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         var owner1 = channel1.getOwnerAsync(bundle1);
         var owner2 = channel2.getOwnerAsync(bundle2);
-        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
-
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+                .when(loadManager).selectAsync(any());
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
@@ -916,8 +936,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         Awaitility.await()
                 .pollInterval(200, TimeUnit.MILLISECONDS)
                 .atMost(140, TimeUnit.SECONDS)
-                .untilAsserted(() -> verify(compactor, times(1))
-                        .compact(eq(ServiceUnitStateChannelImpl.TOPIC), 
any()));
+                .untilAsserted(() -> {
+                    channel1.publishAssignEventAsync(bundle, 
lookupServiceAddress1);
+                    verify(compactor, times(1))
+                            .compact(eq(ServiceUnitStateChannelImpl.TOPIC), 
any());
+                });
+
 
         var channel3 = createChannel(pulsar);
         channel3.start();
@@ -1049,7 +1073,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test(priority = 13)
-    public void assignTestWhenDestBrokerFails()
+    public void assignTestWhenDestBrokerProducerFails()
             throws ExecutionException, InterruptedException, 
IllegalAccessException {
 
         Unload unload = new Unload(lookupServiceAddress1, bundle, 
Optional.empty());
@@ -1076,7 +1100,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+                .when(loadManager).selectAsync(any());
         channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
         // channel1 is broken. the assign won't be complete.
         waitUntilState(channel1, bundle);
@@ -1132,7 +1157,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test(priority = 14)
-    public void splitTestWhenDestBrokerFails()
+    public void splitTestWhenProducerFails()
             throws ExecutionException, InterruptedException, 
IllegalAccessException {
 
 
@@ -1165,7 +1190,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-        channel2.publishSplitEventAsync(new Split(bundle, 
lookupServiceAddress1, null));
+        // Assert child bundle ownerships in the channels.
+
+
+        Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+                childBundle1Range, Optional.empty(), childBundle2Range, 
Optional.empty()));
+        channel2.publishSplitEventAsync(split);
         // channel1 is broken. the split won't be complete.
         waitUntilState(channel1, bundle);
         waitUntilState(channel2, bundle);
@@ -1180,29 +1210,19 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 1 , true);
 
-        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
-        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
 
+        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
-        var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
-        var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
+        waitUntilStateWithMonitor(leader, bundle, Deleted);
+        waitUntilStateWithMonitor(channel1, bundle, Deleted);
+        waitUntilStateWithMonitor(channel2, bundle, Deleted);
 
-        assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+        var ownerAddr1 = channel1.getOwnerAsync(bundle);
+        var ownerAddr2 = channel2.getOwnerAsync(bundle);
+
+        assertTrue(ownerAddr1.isCompletedExceptionally());
+        assertTrue(ownerAddr2.isCompletedExceptionally());
 
-        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
-        validateMonitorCounters(leader,
-                0,
-                0,
-                1,
-                0,
-                0,
-                0,
-                0);
 
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -1273,6 +1293,243 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(channel1.isOwner(bundle));
     }
 
+    @Test(priority = 16)
+    public void splitAndRetryFailureTest() throws Exception {
+        channel1.publishAssignEventAsync(bundle3, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, bundle3, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, bundle3, lookupServiceAddress1);
+        var ownerAddr1 = channel1.getOwnerAsync(bundle3).get();
+        var ownerAddr2 = channel2.getOwnerAsync(bundle3).get();
+        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+        assertTrue(ownerAddr1.isPresent());
+
+        NamespaceService namespaceService = spy(pulsar1.getNamespaceService());
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        int badVersionExceptionCount = 10;
+        AtomicInteger count = new AtomicInteger(badVersionExceptionCount);
+        future.completeExceptionally(new 
MetadataStoreException.BadVersionException("BadVersion"));
+        doAnswer(invocationOnMock -> {
+            if (count.decrementAndGet() > 0) {
+                return future;
+            }
+            // Call the real method
+            reset(namespaceService);
+            doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", 
"test-topic-2")))
+                    
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
+            return future;
+        }).when(namespaceService).updateNamespaceBundlesForPolicies(any(), 
any());
+        doReturn(namespaceService).when(pulsar1).getNamespaceService();
+        doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", 
"test-topic-2")))
+                
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
+
+        // Assert child bundle ownerships in the channels.
+
+        Split split = new Split(bundle3, ownerAddr1.get(), Map.of(
+                childBundle1Range, Optional.empty(), childBundle2Range, 
Optional.empty()));
+        channel1.publishSplitEventAsync(split);
+
+        FieldUtils.writeDeclaredField(channel1,
+                "inFlightStateWaitingTimeInMillis", 1 , true);
+        FieldUtils.writeDeclaredField(channel2,
+                "inFlightStateWaitingTimeInMillis", 1 , true);
+
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    assertEquals(3, count.get());
+                });
+        var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
+        ((ServiceUnitStateChannelImpl) leader)
+                .monitorOwnerships(List.of(lookupServiceAddress1, 
lookupServiceAddress2));
+        waitUntilState(leader, bundle3, Deleted);
+        waitUntilState(channel1, bundle3, Deleted);
+        waitUntilState(channel2, bundle3, Deleted);
+
+
+        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 
1, 0);
+        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 
1, 0);
+        validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
+        validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
+
+        waitUntilNewOwner(channel1, childBundle31, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, childBundle32, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, childBundle31, lookupServiceAddress1);
+        waitUntilNewOwner(channel2, childBundle32, lookupServiceAddress1);
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle31).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(childBundle32).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle31).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(childBundle32).get());
+
+
+        // try the monitor and check the monitor moves `Deleted` -> `Init`
+
+        FieldUtils.writeDeclaredField(channel1,
+                "semiTerminalStateWaitingTimeInMillis", 1, true);
+        FieldUtils.writeDeclaredField(channel2,
+                "semiTerminalStateWaitingTimeInMillis", 1, true);
+
+        ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
+                List.of(lookupServiceAddress1, lookupServiceAddress2));
+        ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
+                List.of(lookupServiceAddress1, lookupServiceAddress2));
+        waitUntilState(channel1, bundle3, Init);
+        waitUntilState(channel2, bundle3, Init);
+
+        validateMonitorCounters(leader,
+                0,
+                1,
+                1,
+                0,
+                0,
+                0,
+                0);
+
+
+        cleanTableViews();
+
+        FieldUtils.writeDeclaredField(channel1,
+                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+        FieldUtils.writeDeclaredField(channel1,
+                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
+
+        FieldUtils.writeDeclaredField(channel2,
+                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+        FieldUtils.writeDeclaredField(channel2,
+                "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
+    }
+
+    @Test(priority = 17)
+    public void testOverrideInactiveBrokerStateData()
+            throws IllegalAccessException, ExecutionException, 
InterruptedException, TimeoutException {
+
+        var leaderChannel = channel1;
+        var followerChannel = channel2;
+        String leader = channel1.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
+        String leader2 = channel2.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
+        assertEquals(leader, leader2);
+        if (leader.equals(lookupServiceAddress2)) {
+            leaderChannel = channel2;
+            followerChannel = channel1;
+        }
+
+        String broker = lookupServiceAddress1;
+
+        // test override states
+        String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
+        String splittingBundle = bundle;
+        String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
+        String freeBundle = "public/free/0xfffffff0_0xffffffff";
+        String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
+        String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
+        overrideTableViews(releasingBundle,
+                new ServiceUnitStateData(Releasing, null, broker, 1));
+        overrideTableViews(splittingBundle,
+                new ServiceUnitStateData(Splitting, null, broker,
+                        Map.of(childBundle1Range, Optional.empty(),
+                                childBundle2Range, Optional.empty()), 1));
+        overrideTableViews(assigningBundle,
+                new ServiceUnitStateData(Assigning, broker, null, 1));
+        overrideTableViews(freeBundle,
+                new ServiceUnitStateData(Free, null, broker, 1));
+        overrideTableViews(deletedBundle,
+                new ServiceUnitStateData(Deleted, null, broker, 1));
+        overrideTableViews(ownedBundle,
+                new ServiceUnitStateData(Owned, broker, null, 1));
+
+        // test stable metadata state
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+                .when(loadManager).selectAsync(any());
+        leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+        followerChannel.handleMetadataSessionEvent(SessionReestablished);
+        FieldUtils.writeDeclaredField(leaderChannel, 
"lastMetadataSessionEventTimestamp",
+                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
+        FieldUtils.writeDeclaredField(followerChannel, 
"lastMetadataSessionEventTimestamp",
+                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
+        leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
+        followerChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
+
+        waitUntilNewOwner(channel2, releasingBundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, ownedBundle, lookupServiceAddress2);
+        assertEquals(Optional.empty(), 
channel2.getOwnerAsync(freeBundle).get());
+        
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
+        
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+
+        // clean-up
+        FieldUtils.writeDeclaredField(leaderChannel, 
"maxCleanupDelayTimeInSecs", 3 * 60, true);
+        cleanTableViews();
+
+    }
+
+    @Test(priority = 18)
+    public void testOverrideOrphanStateData()
+            throws IllegalAccessException, ExecutionException, 
InterruptedException, TimeoutException {
+
+        var leaderChannel = channel1;
+        var followerChannel = channel2;
+        String leader = channel1.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
+        String leader2 = channel2.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
+        assertEquals(leader, leader2);
+        if (leader.equals(lookupServiceAddress2)) {
+            leaderChannel = channel2;
+            followerChannel = channel1;
+        }
+
+        String broker = lookupServiceAddress1;
+
+        // test override states
+        String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
+        String splittingBundle = bundle;
+        String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
+        String freeBundle = "public/free/0xfffffff0_0xffffffff";
+        String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
+        String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
+        overrideTableViews(releasingBundle,
+                new ServiceUnitStateData(Releasing, null, broker, 1));
+        overrideTableViews(splittingBundle,
+                new ServiceUnitStateData(Splitting, null, broker,
+                        Map.of(childBundle1Range, Optional.empty(),
+                                childBundle2Range, Optional.empty()), 1));
+        overrideTableViews(assigningBundle,
+                new ServiceUnitStateData(Assigning, broker, null, 1));
+        overrideTableViews(freeBundle,
+                new ServiceUnitStateData(Free, null, broker, 1));
+        overrideTableViews(deletedBundle,
+                new ServiceUnitStateData(Deleted, null, broker, 1));
+        overrideTableViews(ownedBundle,
+                new ServiceUnitStateData(Owned, broker, null, 1));
+
+        // test stable metadata state
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+                .when(loadManager).selectAsync(any());
+        FieldUtils.writeDeclaredField(leaderChannel, 
"inFlightStateWaitingTimeInMillis",
+                -1, true);
+        FieldUtils.writeDeclaredField(followerChannel, 
"inFlightStateWaitingTimeInMillis",
+                -1, true);
+        ((ServiceUnitStateChannelImpl) leaderChannel)
+                .monitorOwnerships(List.of(lookupServiceAddress1, 
lookupServiceAddress2));
+
+        waitUntilNewOwner(channel2, releasingBundle, broker);
+        waitUntilNewOwner(channel2, childBundle11, broker);
+        waitUntilNewOwner(channel2, childBundle12, broker);
+        waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, ownedBundle, broker);
+        assertEquals(Optional.empty(), 
channel2.getOwnerAsync(freeBundle).get());
+        
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
+        
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+
+        // clean-up
+        FieldUtils.writeDeclaredField(channel1,
+                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+        FieldUtils.writeDeclaredField(channel2,
+                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+        cleanTableViews();
+    }
+
 
     private static ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
@@ -1375,6 +1632,22 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 });
     }
 
+    private void waitUntilStateWithMonitor(ServiceUnitStateChannel channel, 
String key, ServiceUnitState expected)
+            throws IllegalAccessException {
+        TableViewImpl<ServiceUnitStateData> tv = 
(TableViewImpl<ServiceUnitStateData>)
+                FieldUtils.readField(channel, "tableview", true);
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .until(() -> { // wait until true
+                    ((ServiceUnitStateChannelImpl) channel)
+                            .monitorOwnerships(List.of(lookupServiceAddress1, 
lookupServiceAddress2));
+                    ServiceUnitStateData data = tv.get(key);
+                    ServiceUnitState actual = state(data);
+                    return actual == expected;
+                });
+    }
+
     private static void cleanTableView(ServiceUnitStateChannel channel, String 
serviceUnit)
             throws IllegalAccessException {
         var tv = (TableViewImpl<ServiceUnitStateData>)
@@ -1384,6 +1657,26 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         cache.remove(serviceUnit);
     }
 
+    private void cleanTableViews()
+            throws IllegalAccessException {
+        var tv1 = (TableViewImpl<ServiceUnitStateData>)
+                FieldUtils.readField(channel1, "tableview", true);
+        var cache1 = (ConcurrentMap<String, ServiceUnitStateData>)
+                FieldUtils.readField(tv1, "data", true);
+        cache1.clear();
+
+        var tv2 = (TableViewImpl<ServiceUnitStateData>)
+                FieldUtils.readField(channel2, "tableview", true);
+        var cache2 = (ConcurrentMap<String, ServiceUnitStateData>)
+                FieldUtils.readField(tv2, "data", true);
+        cache2.clear();
+    }
+
+    private void overrideTableViews(String serviceUnit, ServiceUnitStateData 
val) throws IllegalAccessException {
+        overrideTableView(channel1, serviceUnit, val);
+        overrideTableView(channel2, serviceUnit, val);
+    }
+
     private static void overrideTableView(ServiceUnitStateChannel channel, 
String serviceUnit, ServiceUnitStateData val)
             throws IllegalAccessException {
         var tv = (TableViewImpl<ServiceUnitStateData>)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
index 1e23ea74dca..fd50f0d8aed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/models/SplitTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.loadbalance.extensions.channel.models;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -56,12 +55,9 @@ public class SplitTest {
         new Split("A", "B", map);
     }
 
-    @Test
+    @Test(expectedExceptions = IllegalArgumentException.class)
     public void testNullSplitServiceUnitToDestBroker() {
-        var split = new Split("A", "B");
-        assertEquals(split.serviceUnit(), "A");
-        assertEquals(split.sourceBroker(), "B");
-        assertNull(split.splitServiceUnitToDestBroker());
+        var split = new Split("A", "B", null);
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
index f7726f8e7a1..f56ad90833b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,6 +58,15 @@ public class SplitSchedulerTest {
     NamespaceBundleSplitStrategy strategy;
     String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
     String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
+
+    String childBundle12 = "tenant/namespace/0x7fffffff_0xffffffff";
+
+    String childBundle11 = "tenant/namespace/0x00000000_0x7fffffff";
+
+    String childBundle22 = "tenant/namespace/0x7fffffff_0x0fffffff";
+
+    String childBundle21 = "tenant/namespace/0x00000000_0x7fffffff";
+
     String broker = "broker-1";
     SplitDecision decision1;
     SplitDecision decision2;
@@ -77,11 +88,15 @@ public class SplitSchedulerTest {
         
doReturn(CompletableFuture.completedFuture(null)).when(channel).publishSplitEventAsync(any());
 
         decision1 = new SplitDecision();
-        decision1.setSplit(new Split(bundle1, broker));
+        Split split = new Split(bundle1, broker, Map.of(
+                childBundle11, Optional.empty(), childBundle12, 
Optional.empty()));
+        decision1.setSplit(split);
         decision1.succeed(SplitDecision.Reason.MsgRate);
 
         decision2 = new SplitDecision();
-        decision2.setSplit(new Split(bundle2, broker));
+        Split split2 = new Split(bundle2, broker, Map.of(
+                childBundle21, Optional.empty(), childBundle22, 
Optional.empty()));
+        decision2.setSplit(split2);
         decision2.succeed(SplitDecision.Reason.Sessions);
         Set<SplitDecision> decisions = Set.of(decision1, decision2);
         doReturn(decisions).when(strategy).findBundlesToSplit(any(), any());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
index 8c765e7a3df..8b489c92af0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
@@ -28,9 +28,13 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
+import com.google.common.hash.Hashing;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -38,15 +42,19 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
-import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.PulsarStats;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -71,7 +79,12 @@ public class DefaultNamespaceBundleSplitStrategyTest {
 
     String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
     String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
-
+    Long splitBoundary1 = 0x7fffffffL;
+    Long splitBoundary2 = 0x07ffffffL;
+    String childBundle12 = "0x7fffffff_0xffffffff";
+    String childBundle11 = "0x00000000_0x7fffffff";
+    String childBundle22 = "0x07ffffff_0x0fffffff";
+    String childBundle21 = "0x00000000_0x07ffffff";
     String broker = "broker-1";
 
     @BeforeMethod
@@ -90,19 +103,21 @@ public class DefaultNamespaceBundleSplitStrategyTest {
         brokerService = mock(BrokerService.class);
         pulsarStats = mock(PulsarStats.class);
         namespaceService = mock(NamespaceService.class);
-        namespaceBundleFactory = mock(NamespaceBundleFactory.class);
+
         loadManagerContext = mock(LoadManagerContext.class);
         brokerRegistry = mock(BrokerRegistry.class);
         loadManagerWrapper = mock(ExtensibleLoadManagerWrapper.class);
         loadManager = mock(ExtensibleLoadManagerImpl.class);
         channel = mock(ServiceUnitStateChannelImpl.class);
 
+
+        
doReturn(mock(MetadataStoreExtended.class)).when(pulsar).getLocalMetadataStore();
+        namespaceBundleFactory = spy(new NamespaceBundleFactory(pulsar, 
Hashing.crc32()));
         doReturn(brokerService).when(pulsar).getBrokerService();
         doReturn(config).when(pulsar).getConfiguration();
         doReturn(pulsarStats).when(brokerService).getPulsarStats();
         doReturn(namespaceService).when(pulsar).getNamespaceService();
         
doReturn(namespaceBundleFactory).when(namespaceService).getNamespaceBundleFactory();
-        doReturn(true).when(namespaceBundleFactory).canSplitBundle(any());
         doReturn(brokerRegistry).when(loadManagerContext).brokerRegistry();
         doReturn(broker).when(brokerRegistry).getBrokerId();
         doReturn(new 
AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager();
@@ -110,6 +125,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
         doReturn(channel).when(loadManager).getServiceUnitStateChannel();
         doReturn(true).when(channel).isOwner(any());
 
+        var namespaceBundle1 = namespaceBundleFactory.getBundle(
+                LoadManagerShared.getNamespaceNameFromBundleName(bundle1),
+                LoadManagerShared.getBundleRangeFromBundleName(bundle1));
+        var namespaceBundle2 = namespaceBundleFactory.getBundle(
+                LoadManagerShared.getNamespaceNameFromBundleName(bundle2),
+                LoadManagerShared.getBundleRangeFromBundleName(bundle2));
+        doReturn(CompletableFuture.completedFuture(
+                
List.of(splitBoundary1))).when(namespaceService).getSplitBoundary(
+                        eq(namespaceBundle1), eq((List<Long>)null), any());
+        doReturn(CompletableFuture.completedFuture(
+                
List.of(splitBoundary2))).when(namespaceService).getSplitBoundary(
+                        eq(namespaceBundle2), eq((List<Long>)null), any());
 
         bundleStats = new LinkedHashMap<>();
         NamespaceBundleStats stats1 = new NamespaceBundleStats();
@@ -184,6 +211,20 @@ public class DefaultNamespaceBundleSplitStrategyTest {
         verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), 
eq(Unknown));
     }
 
+    public void testSplittingBundle() {
+        var counter = spy(new SplitCounter());
+        
config.setLoadBalancerNamespaceBundleSplitConditionHitCountThreshold(0);
+        bundleStats.values().forEach(v -> v.msgRateIn = 
config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+        doReturn(Map.of("tenant/namespace/0x00000000_0xFFFFFFFF",
+                new ServiceUnitStateData(ServiceUnitState.Splitting, broker, 
1)).entrySet())
+                .when(channel).getOwnershipEntrySet();
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+        var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+        var expected = Set.of();
+        assertEquals(actual, expected);
+        verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), 
eq(Unknown));
+    }
+
     public void testMaxMsgRate() {
         var counter = spy(new SplitCounter());
         var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
@@ -196,14 +237,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
             var actual = strategy.findBundlesToSplit(loadManagerContext, 
pulsar);
             if (i == threshold) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle1, broker));
+                Split split = new Split(bundle1, broker, Map.of(
+                        childBundle11, Optional.empty(), childBundle12, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.MsgRate);
 
                 assertEquals(actual, Set.of(decision1));
                 verify(counter, 
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
             } else if (i == threshold + 1) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle2, broker));
+                Split split = new Split(bundle2, broker, Map.of(
+                        childBundle21, Optional.empty(), childBundle22, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.MsgRate);
 
                 assertEquals(actual, Set.of(decision1));
@@ -224,14 +269,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
             var actual = strategy.findBundlesToSplit(loadManagerContext, 
pulsar);
             if (i == threshold) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle1, broker));
+                Split split = new Split(bundle1, broker, Map.of(
+                        childBundle11, Optional.empty(), childBundle12, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.Topics);
 
                 assertEquals(actual, Set.of(decision1));
                 verify(counter, 
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
             } else if (i == threshold + 1) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle2, broker));
+                Split split = new Split(bundle2, broker, Map.of(
+                        childBundle21, Optional.empty(), childBundle22, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.Topics);
 
                 assertEquals(actual, Set.of(decision1));
@@ -255,14 +304,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
             var actual = strategy.findBundlesToSplit(loadManagerContext, 
pulsar);
             if (i == threshold) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle1, broker));
+                Split split = new Split(bundle1, broker, Map.of(
+                        childBundle11, Optional.empty(), childBundle12, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.Sessions);
 
                 assertEquals(actual, Set.of(decision1));
                 verify(counter, 
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
             } else if (i == threshold + 1) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle2, broker));
+                Split split = new Split(bundle2, broker, Map.of(
+                        childBundle21, Optional.empty(), childBundle22, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.Sessions);
 
                 assertEquals(actual, Set.of(decision1));
@@ -286,14 +339,18 @@ public class DefaultNamespaceBundleSplitStrategyTest {
             var actual = strategy.findBundlesToSplit(loadManagerContext, 
pulsar);
             if (i == threshold) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle1, broker));
+                Split split = new Split(bundle1, broker, Map.of(
+                        childBundle11, Optional.empty(), childBundle12, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.Bandwidth);
 
                 assertEquals(actual, Set.of(decision1));
                 verify(counter, 
times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
             } else if (i == threshold + 1) {
                 SplitDecision decision1 = new SplitDecision();
-                decision1.setSplit(new Split(bundle2, broker));
+                Split split = new Split(bundle2, broker, Map.of(
+                        childBundle21, Optional.empty(), childBundle22, 
Optional.empty()));
+                decision1.setSplit(split);
                 decision1.succeed(SplitDecision.Reason.Bandwidth);
 
                 assertEquals(actual, Set.of(decision1));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
index ec458def65b..80f9c279a55 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java
@@ -34,6 +34,21 @@ import static org.testng.Assert.fail;
 
 public class SpecifiedPositionsBundleSplitAlgorithmTest {
 
+    @Test
+    public void testEmptyTopicWithForce() {
+        SpecifiedPositionsBundleSplitAlgorithm algorithm = new 
SpecifiedPositionsBundleSplitAlgorithm(true);
+        NamespaceService mockNamespaceService = mock(NamespaceService.class);
+        NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class);
+        doReturn(1L).when(mockNamespaceBundle).getLowerEndpoint();
+        doReturn(1000L).when(mockNamespaceBundle).getUpperEndpoint();
+        doReturn(CompletableFuture.completedFuture(List.of()))
+                
.when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle);
+        List<Long> splitPositions =
+                algorithm.getSplitBoundary(new 
BundleSplitOption(mockNamespaceService, mockNamespaceBundle,
+                        Arrays.asList(1L, 2L))).join();
+        assertEquals(splitPositions, Arrays.asList(2L));
+    }
+
     @Test
     public void testTotalTopicsSizeLessThan1() {
         SpecifiedPositionsBundleSplitAlgorithm algorithm = new 
SpecifiedPositionsBundleSplitAlgorithm();


Reply via email to