This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c59001c9c599bb151c2fcf07fe609898d44bcc72
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue May 21 15:53:26 2024 -0700

    [fix][broker] Immediately tombstone Deleted and Free state bundles (#22743)
    
    (cherry picked from commit 949260f190c3ff48d16f9450083c2e8c5c9ff302)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  5 +--
 .../channel/ServiceUnitStateChannelImpl.java       | 18 +++++++---
 .../extensions/manager/SplitManager.java           |  2 +-
 .../extensions/manager/UnloadManager.java          | 14 ++++++--
 .../pulsar/broker/namespace/NamespaceService.java  |  5 +--
 .../extensions/ExtensibleLoadManagerImplTest.java  | 33 +++++++++++++++--
 .../channel/ServiceUnitStateChannelTest.java       | 42 ++++++++++++----------
 .../extensions/manager/SplitManagerTest.java       | 27 +++-----------
 .../extensions/manager/UnloadManagerTest.java      | 27 +++++++++++---
 9 files changed, 113 insertions(+), 60 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index b35143c5d81..ad5e539ed33 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -668,7 +668,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     }
 
     public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId 
bundle,
-                                                              Optional<String> 
destinationBroker) {
+                                                              Optional<String> 
destinationBroker,
+                                                              boolean force) {
         if 
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
 {
             log.info("Skip unloading namespace bundle: {}.", bundle);
             return CompletableFuture.completedFuture(null);
@@ -687,7 +688,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                         log.warn(msg);
                         throw new IllegalArgumentException(msg);
                     }
-                    Unload unload = new Unload(sourceBroker, 
bundle.toString(), destinationBroker, true);
+                    Unload unload = new Unload(sourceBroker, 
bundle.toString(), destinationBroker, force);
                     UnloadDecision unloadDecision =
                             new UnloadDecision(unload, 
UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
                     return unloadAsync(unloadDecision,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f6fb4503b03..76c4c1dc51a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -841,6 +841,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                         Free, null, data.sourceBroker(), 
getNextVersionId(data));
                 unloadFuture = closeServiceUnit(serviceUnit, true);
             }
+            // If the optimized bundle unload is disabled, disconnect the 
clients at time of RELEASE.
             stateChangeListeners.notifyOnCompletion(unloadFuture
                             .thenCompose(__ -> pubAsync(serviceUnit, next)), 
serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, next));
@@ -861,9 +862,12 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         if (isTargetBroker(data.sourceBroker())) {
-            stateChangeListeners.notifyOnCompletion(
-                            data.force() ? closeServiceUnit(serviceUnit, true)
-                                    : CompletableFuture.completedFuture(0), 
serviceUnit, data)
+            // If data.force(), try closeServiceUnit and tombstone the bundle.
+            CompletableFuture<Void> future =
+                    (data.force() ? closeServiceUnit(serviceUnit, true)
+                            .thenCompose(__ -> tombstoneAsync(serviceUnit))
+                            : 
CompletableFuture.completedFuture(0)).thenApply(__ -> null);
+            stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
         } else {
             stateChangeListeners.notify(serviceUnit, data, null);
@@ -875,9 +879,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (getOwnerRequest != null) {
             getOwnerRequest.completeExceptionally(new 
IllegalStateException(serviceUnit + "has been deleted."));
         }
-        stateChangeListeners.notify(serviceUnit, data, null);
+
         if (isTargetBroker(data.sourceBroker())) {
-            log(null, serviceUnit, data, null);
+            stateChangeListeners.notifyOnCompletion(
+                            tombstoneAsync(serviceUnit), serviceUnit, data)
+                    .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+        } else {
+            stateChangeListeners.notify(serviceUnit, data, null);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
index 71ebbc92a87..ac21e4c6241 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
@@ -97,7 +97,7 @@ public class SplitManager implements StateChangeListener {
             return;
         }
         switch (state) {
-            case Deleted, Owned, Init -> this.complete(serviceUnit, t);
+            case Init -> this.complete(serviceUnit, t);
             default -> {
                 if (log.isDebugEnabled()) {
                     log.debug("Handling {} for service unit {}", data, 
serviceUnit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index bf9885b2a25..742b23dc2d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -93,7 +93,7 @@ public class UnloadManager implements StateChangeListener {
     public void handleEvent(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
         ServiceUnitState state = ServiceUnitStateData.state(data);
 
-        if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || 
state == Assigning)) {
+        if ((state == Owned || state == Assigning) && 
StringUtils.isBlank(data.sourceBroker())) {
             if (log.isDebugEnabled()) {
                 log.debug("Skipping {} for service unit {} from the assignment 
command.", data, serviceUnit);
             }
@@ -113,7 +113,17 @@ public class UnloadManager implements StateChangeListener {
         }
 
         switch (state) {
-            case Free, Owned -> this.complete(serviceUnit, t);
+            case Free -> {
+                if (!data.force()) {
+                    complete(serviceUnit, t);
+                }
+            }
+            case Init -> {
+                if (data.force()) {
+                    complete(serviceUnit, t);
+                }
+            }
+            case Owned -> complete(serviceUnit, t);
             default -> {
                 if (log.isDebugEnabled()) {
                     log.debug("Handling {} for service unit {}", data, 
serviceUnit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 187290566d4..16c8c4c702f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -783,7 +783,7 @@ public class NamespaceService implements AutoCloseable {
                                                          boolean 
closeWithoutWaitingClientDisconnect) {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return ExtensibleLoadManagerImpl.get(loadManager.get())
-                    .unloadNamespaceBundleAsync(bundle, destinationBroker);
+                    .unloadNamespaceBundleAsync(bundle, destinationBroker, 
false);
         }
         // unload namespace bundle
         OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
@@ -1232,7 +1232,8 @@ public class NamespaceService implements AutoCloseable {
         CompletableFuture<Void> future;
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
-            future = 
extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
+            future = extensibleLoadManager.unloadNamespaceBundleAsync(
+                    nsBundle, Optional.empty(), true);
         } else {
             future = ownershipCache.removeOwnership(nsBundle);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index f9c1bf97a2f..e4ab71ba2a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -620,7 +620,8 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
     @Test(timeOut = 30 * 1000)
     public void testSplitBundleAdminAPI() throws Exception {
-        String namespace = defaultTestNamespace;
+        final String namespace = "public/testSplitBundleAdminAPI";
+        admin.namespaces().createNamespace(namespace, 1);
         Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-split");
         TopicName topicName = topicAndBundle.getLeft();
         admin.topics().createPartitionedTopic(topicName.toString(), 10);
@@ -674,6 +675,30 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         } catch (PulsarAdminException ex) {
             assertTrue(ex.getMessage().contains("Invalid bundle range"));
         }
+
+
+        // delete and retry
+        Awaitility.await()
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    admin.namespaces().deleteNamespace(namespace);
+                });
+        admin.namespaces().createNamespace(namespace, 1);
+        admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, 
null);
+
+        Awaitility.await()
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    BundlesData bundlesData = 
admin.namespaces().getBundles(namespace);
+                    assertEquals(bundlesData.getNumBundles(), numBundles + 1);
+                    String lowBundle = String.format("0x%08x", 
bundleRanges.get(0));
+                    String midBundle = String.format("0x%08x", mid);
+                    String highBundle = String.format("0x%08x", 
bundleRanges.get(1));
+                    
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
+                    
assertTrue(bundlesData.getBoundaries().contains(midBundle));
+                    
assertTrue(bundlesData.getBoundaries().contains(highBundle));
+                    assertEquals(splitCount.get(), 2);
+                });
     }
 
     @Test(timeOut = 30 * 1000)
@@ -1489,7 +1514,11 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         NamespaceEphemeralData namespaceEphemeralData = 
primaryLoadManager.tryAcquiringOwnership(bundle).get();
         assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrl())
                 .contains(namespaceEphemeralData.getNativeUrl()));
-        admin.namespaces().deleteNamespace(namespace);
+        Awaitility.await()
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    admin.namespaces().deleteNamespace(namespace, true);
+                });
     }
 
     @Test(timeOut = 30 * 1000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index fe8387710ee..dd08400ea85 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -576,11 +576,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 childBundle1Range, Optional.empty(), childBundle2Range, 
Optional.empty()));
         channel1.publishSplitEventAsync(split);
 
-        waitUntilState(channel1, bundle, Deleted);
-        waitUntilState(channel2, bundle, Deleted);
+        waitUntilState(channel1, bundle, Init);
+        waitUntilState(channel2, bundle, Init);
 
-        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 
1, 0);
-        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 
1, 0);
+        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 
1, 0);
+        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 
1, 0);
         validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
         validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
         // Verify the retry count
@@ -620,7 +620,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
         validateMonitorCounters(leader,
                 0,
-                1,
+                0,
                 0,
                 0,
                 0,
@@ -1236,15 +1236,15 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
 
-        waitUntilStateWithMonitor(leader, bundle, Deleted);
-        waitUntilStateWithMonitor(channel1, bundle, Deleted);
-        waitUntilStateWithMonitor(channel2, bundle, Deleted);
+        waitUntilStateWithMonitor(leader, bundle, Init);
+        waitUntilStateWithMonitor(channel1, bundle, Init);
+        waitUntilStateWithMonitor(channel2, bundle, Init);
 
         var ownerAddr1 = channel1.getOwnerAsync(bundle);
         var ownerAddr2 = channel2.getOwnerAsync(bundle);
 
-        assertTrue(ownerAddr1.isCompletedExceptionally());
-        assertTrue(ownerAddr2.isCompletedExceptionally());
+        assertTrue(ownerAddr1.get().isEmpty());
+        assertTrue(ownerAddr2.get().isEmpty());
 
 
         FieldUtils.writeDeclaredField(channel1,
@@ -1428,13 +1428,15 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
         ((ServiceUnitStateChannelImpl) leader)
                 .monitorOwnerships(List.of(brokerId1, brokerId2));
-        waitUntilState(leader, bundle3, Deleted);
-        waitUntilState(channel1, bundle3, Deleted);
-        waitUntilState(channel2, bundle3, Deleted);
+
+        waitUntilState(leader, bundle3, Init);
+        waitUntilState(channel1, bundle3, Init);
+        waitUntilState(channel2, bundle3, Init);
 
 
-        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 
1, 0);
-        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 
1, 0);
+
+        validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 
1, 0);
+        validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 
1, 0);
         validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
         validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
 
@@ -1464,7 +1466,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         validateMonitorCounters(leader,
                 0,
-                1,
+                0,
                 1,
                 0,
                 0,
@@ -1542,7 +1544,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel2, ownedBundle, brokerId2);
         assertEquals(Optional.empty(), 
channel2.getOwnerAsync(freeBundle).get());
         
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
-        
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+        assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
 
         // clean-up
         FieldUtils.writeDeclaredField(leaderChannel, 
"maxCleanupDelayTimeInSecs", 3 * 60, true);
@@ -1605,7 +1607,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel2, ownedBundle, broker);
         assertEquals(Optional.empty(), 
channel2.getOwnerAsync(freeBundle).get());
         
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
-        
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
+        assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
 
         // clean-up
         FieldUtils.writeDeclaredField(channel1,
@@ -1663,8 +1665,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 20 * 1000, true);
         start = System.currentTimeMillis();
         assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
-        assertTrue(System.currentTimeMillis() - start < 20_000);
+        waitUntilState(channel1, bundle, Init);
+        waitUntilState(channel2, bundle, Init);
 
+        assertTrue(System.currentTimeMillis() - start < 20_000);
         // simulate ownership cleanup(brokerId1 selected owner) by the leader 
channel
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
index 3287306ab48..57b7830214b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
@@ -123,40 +123,23 @@ public class SplitManagerTest {
         manager.handleEvent(bundle,
                 new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, 
VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequests.size(), 1);
-        assertEquals(counter.toMetrics(null).toString(),
-                counterExpected.toMetrics(null).toString());
 
         manager.handleEvent(bundle,
                 new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, 
VERSION_ID_INIT), null);
-        counterExpected.update(SplitDecision.Label.Success, Sessions);
-        assertEquals(inFlightUnloadRequests.size(), 0);
-        assertEquals(counter.toMetrics(null).toString(),
-                counterExpected.toMetrics(null).toString());
+        assertEquals(inFlightUnloadRequests.size(), 1);
 
-        // Success with Init state.
-        future = manager.waitAsync(CompletableFuture.completedFuture(null),
-                bundle, decision, 5, TimeUnit.SECONDS);
-        inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, 
VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequests.size(), 1);
+
+        // Success with Init state.
         manager.handleEvent(bundle,
                 new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, 
VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequests.size(), 0);
         counterExpected.update(SplitDecision.Label.Success, Sessions);
         assertEquals(counter.toMetrics(null).toString(),
                 counterExpected.toMetrics(null).toString());
-        future.get();
 
-        // Success with Owned state.
-        future = manager.waitAsync(CompletableFuture.completedFuture(null),
-                bundle, decision, 5, TimeUnit.SECONDS);
-        inFlightUnloadRequests = getinFlightUnloadRequests(manager);
-        assertEquals(inFlightUnloadRequests.size(), 1);
-        manager.handleEvent(bundle,
-                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, 
VERSION_ID_INIT), null);
-        assertEquals(inFlightUnloadRequests.size(), 0);
-        counterExpected.update(SplitDecision.Label.Success, Sessions);
-        assertEquals(counter.toMetrics(null).toString(),
-                counterExpected.toMetrics(null).toString());
         future.get();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 45b1cd9544f..b7dae620629 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -123,11 +123,15 @@ public class UnloadManagerTest {
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
         manager.handleEvent(bundle,
-                new ServiceUnitStateData(ServiceUnitState.Init, null, 
srcBroker, VERSION_ID_INIT), null);
+                new ServiceUnitStateData(ServiceUnitState.Free, null, 
srcBroker, true, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
+        // Success with Init state.
         manager.handleEvent(bundle,
-                new ServiceUnitStateData(ServiceUnitState.Free, null, 
srcBroker, VERSION_ID_INIT), null);
+                new ServiceUnitStateData(ServiceUnitState.Init, null, 
srcBroker, false, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Init, null, 
srcBroker, true, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 0);
         future.get();
         
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
@@ -137,17 +141,30 @@ public class UnloadManagerTest {
                 bundle, unloadDecision, 5, TimeUnit.SECONDS);
         inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
-
         manager.handleEvent(bundle,
                 new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, 
null, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
-
         manager.handleEvent(bundle,
                 new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 0);
-
         future.get();
         
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
+
+        // Success with Free state.
+        future = manager.waitAsync(CompletableFuture.completedFuture(null),
+                bundle, unloadDecision, 5, TimeUnit.SECONDS);
+        inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, 
srcBroker, true, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, 
srcBroker, false, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 0);
+        future.get();
+        
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3);
+
+
     }
 
     @Test

Reply via email to