This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b2658af8475 [improve][broker] PIP-192: Make unload and transfer admin 
API functional (#19538)
b2658af8475 is described below

commit b2658af847511fef29455a22e96baa1cdc9d278f
Author: Kai Wang <[email protected]>
AuthorDate: Tue Mar 7 16:21:57 2023 +0800

    [improve][broker] PIP-192: Make unload and transfer admin API functional 
(#19538)
    
    PIP-192: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    
    Currently, the unload and transfer admin API still uses the old logic when 
enabling the `ExtensibleLoadManager`,
    we need to publish the message to `ServiceUnitStateChannel`, and wait for 
its response.
    
    ### Modifications
    
    This PR added a `UnloadManager ` to handle the duplicate unload request and 
return a `CompletableFuture` to the caller, so we can know when the unload 
operation is finished.
    
    The unload and transfer admin API also been fixed, now we can support do 
unload when using `ExtensibleLoadManager`.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  17 +-
 .../extensions/ExtensibleLoadManagerImpl.java      |  53 +++++-
 .../channel/ServiceUnitStateChannel.java           |   8 +
 .../channel/ServiceUnitStateChannelImpl.java       |  24 ++-
 .../extensions/channel/StateChangeListeners.java   |  67 ++++++++
 .../extensions/manager/StateChangeListener.java    |  33 ++++
 .../extensions/manager/UnloadManager.java          | 103 ++++++++++++
 .../extensions/manager/package-info.java           |  19 +++
 .../extensions/scheduler/UnloadScheduler.java      |  17 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   8 +
 .../pulsar/broker/web/PulsarWebResource.java       |  13 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  72 +++++++++
 .../extensions/manager/UnloadManagerTest.java      | 178 +++++++++++++++++++++
 .../extensions/scheduler/UnloadSchedulerTest.java  |  16 +-
 14 files changed, 609 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 30f01ece7de..5be675f7b63 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.Subscription;
@@ -983,8 +984,17 @@ public abstract class NamespacesBase extends AdminResource 
{
                     }
                     return CompletableFuture.completedFuture(null);
                 })
-                .thenCompose(__ -> validateLeaderBrokerAsync())
+                .thenCompose(__ -> {
+                    if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return validateLeaderBrokerAsync();
+                })
                 .thenAccept(__ -> {
+                    if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+                        return;
+                    }
+                    // For ExtensibleLoadManager, this operation will be 
ignored.
                     
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
                 });
     }
@@ -1036,10 +1046,11 @@ public abstract class NamespacesBase extends 
AdminResource {
                                         namespaceName, bundleRange);
                                 return CompletableFuture.completedFuture(null);
                             }
+                            Optional<String> destinationBrokerOpt = 
Optional.ofNullable(destinationBroker);
                             return 
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
                                     authoritative, true)
-                                    .thenCompose(nsBundle ->
-                                            
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle));
+                                    .thenCompose(nsBundle -> 
pulsar().getNamespaceService()
+                                            .unloadNamespaceBundle(nsBundle, 
destinationBrokerOpt));
                         }));
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 650c12af573..2bebe203d87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
@@ -43,9 +44,11 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
 import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 import 
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
@@ -110,6 +113,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     private ScheduledFuture brokerLoadDataReportTask;
     private ScheduledFuture topBundlesLoadDataReportTask;
 
+    private UnloadManager unloadManager;
+
     private boolean started = false;
 
     private final AssignCounter assignCounter = new AssignCounter();
@@ -143,6 +148,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         return 
ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
     }
 
+    public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
+        if (!(loadManager instanceof ExtensibleLoadManagerWrapper 
loadManagerWrapper)) {
+            throw new IllegalArgumentException("The load manager should be 
'ExtensibleLoadManagerWrapper'.");
+        }
+        return loadManagerWrapper.get();
+    }
+
     @Override
     public void start() throws PulsarServerException {
         if (this.started) {
@@ -151,6 +163,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         this.brokerRegistry = new BrokerRegistryImpl(pulsar);
         this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
         this.brokerRegistry.start();
+        this.unloadManager = new UnloadManager();
+        this.serviceUnitStateChannel.listen(unloadManager);
         this.serviceUnitStateChannel.start();
 
         try {
@@ -201,7 +215,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                         interval, TimeUnit.MILLISECONDS);
 
         // TODO: Start bundle split scheduler.
-        this.unloadScheduler = new 
UnloadScheduler(pulsar.getLoadManagerExecutor(), context, 
serviceUnitStateChannel);
+        this.unloadScheduler = new UnloadScheduler(
+                pulsar.getLoadManagerExecutor(), unloadManager, context, 
serviceUnitStateChannel);
         this.unloadScheduler.start();
         this.started = true;
     }
@@ -300,6 +315,12 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     @Override
     public CompletableFuture<Boolean> 
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
+        return getOwnershipAsync(topic, bundleUnit)
+                .thenApply(broker -> 
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+    }
+
+    private CompletableFuture<Optional<String>> 
getOwnershipAsync(Optional<ServiceUnitId> topic,
+                                                                 ServiceUnitId 
bundleUnit) {
         final String bundle = bundleUnit.toString();
         CompletableFuture<Optional<String>> owner;
         if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
@@ -307,8 +328,35 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         } else {
             owner = serviceUnitStateChannel.getOwnerAsync(bundle);
         }
+        return owner;
+    }
+
+    public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId 
bundle,
+                                                              Optional<String> 
destinationBroker) {
+        return getOwnershipAsync(Optional.empty(), bundle)
+                .thenCompose(brokerOpt -> {
+                    if (brokerOpt.isEmpty()) {
+                        String msg = String.format("Namespace bundle: %s is 
not owned by any broker.", bundle);
+                        log.warn(msg);
+                        throw new IllegalStateException(msg);
+                    }
+                    String sourceBroker = brokerOpt.get();
+                    if (destinationBroker.isPresent() && 
sourceBroker.endsWith(destinationBroker.get())) {
+                        String msg = String.format("Namespace bundle: %s own 
by %s, cannot be transfer to same broker.",
+                                bundle, sourceBroker);
+                        log.warn(msg);
+                        throw new IllegalArgumentException(msg);
+                    }
+                    return unloadAsync(new Unload(sourceBroker, 
bundle.toString(), destinationBroker),
+                            conf.getNamespaceBundleUnloadingTimeoutMs(), 
TimeUnit.MILLISECONDS);
+                });
+    }
 
-        return owner.thenApply(broker -> 
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+    private CompletableFuture<Void> unloadAsync(Unload unload,
+                                               long timeout,
+                                               TimeUnit timeoutUnit) {
+        CompletableFuture<Void> future = 
serviceUnitStateChannel.publishUnloadEventAsync(unload);
+        return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, 
timeoutUnit);
     }
 
     @Override
@@ -337,6 +385,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 try {
                     this.serviceUnitStateChannel.close();
                 } finally {
+                    this.unloadManager.close();
                     this.started = false;
                 }
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 44950a21ffd..dc4d582ddb0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.PulsarServerException;
+import 
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.common.stats.Metrics;
@@ -156,4 +157,11 @@ public interface ServiceUnitStateChannel extends Closeable 
{
      */
     List<Metrics> getMetrics();
 
+    /**
+     * Add a state change listener.
+     *
+     * @param listener State change listener.
+     */
+    void listen(StateChangeListener listener);
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index cddaf92d227..5f24e41dda9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -72,6 +72,7 @@ import 
org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import 
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
@@ -117,6 +118,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final ConcurrentOpenHashMap<String, CompletableFuture<String>> 
getOwnerRequests;
     private final String lookupServiceAddress;
     private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> 
cleanupJobs;
+    private final StateChangeListeners stateChangeListeners;
     private final LeaderElectionService leaderElectionService;
     private BrokerSelectionStrategy brokerSelector;
     private BrokerRegistry brokerRegistry;
@@ -191,6 +193,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         this.getOwnerRequests = ConcurrentOpenHashMap.<String,
                 CompletableFuture<String>>newBuilder().build();
         this.cleanupJobs = ConcurrentOpenHashMap.<String, 
CompletableFuture<Void>>newBuilder().build();
+        this.stateChangeListeners = new StateChangeListeners();
         this.semiTerminalStateWaitingTimeInMillis = 
config.getLoadBalancerServiceUnitStateCleanUpDelayTimeInSeconds()
                 * 1000;
         this.inFlightStateWaitingTimeInMillis = 
MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
@@ -357,6 +360,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 log.info("Successfully cancelled the cleanup tasks");
             }
 
+            if (stateChangeListeners != null) {
+                stateChangeListeners.close();
+            }
+
             log.info("Successfully closed the channel.");
 
         } catch (Exception e) {
@@ -619,6 +626,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(data.dstBroker());
         }
+        stateChangeListeners.notify(serviceUnit, data, null);
         if (isTargetBroker(data.dstBroker())) {
             log(null, serviceUnit, data, null);
         }
@@ -628,7 +636,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (isTargetBroker(data.dstBroker())) {
             ServiceUnitStateData next = new ServiceUnitStateData(
                     Owned, data.dstBroker(), data.sourceBroker(), 
getNextVersionId(data));
-            pubAsync(serviceUnit, next)
+            stateChangeListeners.notifyOnCompletion(pubAsync(serviceUnit, 
next), serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, next));
         }
     }
@@ -644,15 +652,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 next = new ServiceUnitStateData(
                         Free, null, data.sourceBroker(), 
getNextVersionId(data));
             }
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
+            
stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit)
+                            .thenCompose(__ -> pubAsync(serviceUnit, next)), 
serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, next));
         }
     }
 
     private void handleSplitEvent(String serviceUnit, ServiceUnitStateData 
data) {
         if (isTargetBroker(data.sourceBroker())) {
-            splitServiceUnit(serviceUnit, data)
+            
stateChangeListeners.notifyOnCompletion(splitServiceUnit(serviceUnit, data), 
serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
         }
     }
@@ -662,6 +670,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(null);
         }
+        stateChangeListeners.notify(serviceUnit, data, null);
         if (isTargetBroker(data.sourceBroker())) {
             log(null, serviceUnit, data, null);
         }
@@ -672,6 +681,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (getOwnerRequest != null) {
             getOwnerRequest.completeExceptionally(new 
IllegalStateException(serviceUnit + "has been deleted."));
         }
+        stateChangeListeners.notify(serviceUnit, data, null);
         if (isTargetBroker(data.sourceBroker())) {
             log(null, serviceUnit, data, null);
         }
@@ -682,6 +692,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(null);
         }
+        stateChangeListeners.notify(serviceUnit, null, null);
         log(null, serviceUnit, null, null);
     }
 
@@ -1302,4 +1313,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
         return metrics;
     }
+
+    @Override
+    public void listen(StateChangeListener listener) {
+        this.stateChangeListeners.addListener(listener);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
new file mode 100644
index 00000000000..1d396f500b6
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.channel;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
+
+@Slf4j
+public class StateChangeListeners {
+
+    private final List<StateChangeListener> stateChangeListeners;
+
+    public StateChangeListeners() {
+        stateChangeListeners = new CopyOnWriteArrayList<>();
+    }
+
+    public void addListener(StateChangeListener listener) {
+        Objects.requireNonNull(listener);
+        stateChangeListeners.add(listener);
+    }
+
+    public void close() {
+        this.stateChangeListeners.clear();
+    }
+
+    /**
+     * Notify all currently added listeners on completion of the future.
+     *
+     * @return future of a new completion stage
+     */
+    public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T> 
future,
+                                                       String serviceUnit,
+                                                       ServiceUnitStateData 
data) {
+        return future.whenComplete((r, ex) -> notify(serviceUnit, data, ex));
+    }
+
+    public void notify(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
+        stateChangeListeners.forEach(listener -> {
+            try {
+                listener.handleEvent(serviceUnit, data, t);
+            } catch (Throwable ex) {
+                log.error("StateChangeListener: {} exception while handling {} 
for service unit {}",
+                        listener, data, serviceUnit, ex);
+            }
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
new file mode 100644
index 00000000000..7ba8be8771b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+
+public interface StateChangeListener {
+
+    /**
+     * Handle the service unit state change.
+     *
+     * @param serviceUnit - Service Unit(Namespace bundle).
+     * @param data - Service unit state data.
+     * @param t - Exception, if present.
+     */
+    void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable 
t);
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
new file mode 100644
index 00000000000..ead6384daba
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+
+/**
+ * Unload manager.
+ */
+@Slf4j
+public class UnloadManager implements StateChangeListener {
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager() {
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    private void complete(String serviceUnit, Throwable ex) {
+        inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
+            if (!future.isDone()) {
+                if (ex != null) {
+                    future.completeExceptionally(ex);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Complete exceptionally unload bundle: {}", 
serviceUnit, ex);
+                    }
+                } else {
+                    future.complete(null);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Complete unload bundle: {}", serviceUnit);
+                    }
+                }
+            }
+            return null;
+        });
+    }
+
+    public CompletableFuture<Void> waitAsync(CompletableFuture<Void> 
eventPubFuture,
+                                             String bundle,
+                                             long timeout,
+                                             TimeUnit timeoutUnit) {
+
+        return eventPubFuture.thenCompose(__ -> 
inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Handle unload bundle: {}, timeout: {} {}", bundle, 
timeout, timeoutUnit);
+            }
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
+                if (ex != null) {
+                    inFlightUnloadRequest.remove(bundle);
+                    log.warn("Failed to wait unload for serviceUnit: {}", 
bundle, ex);
+                }
+            });
+            return future;
+        }));
+    }
+
+    @Override
+    public void handleEvent(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
+        ServiceUnitState state = ServiceUnitStateData.state(data);
+        switch (state) {
+            case Free, Owned -> this.complete(serviceUnit, t);
+            default -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handling {} for service unit {}", data, 
serviceUnit);
+                }
+            }
+        }
+    }
+
+    public void close() {
+        inFlightUnloadRequest.forEach((bundle, future) -> {
+            if (!future.isDone()) {
+                String msg = String.format("Unloading bundle: %s, but the 
unload manager already closed.", bundle);
+                log.warn(msg);
+                future.completeExceptionally(new IllegalStateException(msg));
+            }
+        });
+        inFlightUnloadRequest.clear();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java
new file mode 100644
index 00000000000..ac553c06900
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
index 5cdbd302710..e6460269787 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Reflections;
@@ -42,6 +43,8 @@ public class UnloadScheduler implements LoadManagerScheduler {
 
     private final ScheduledExecutorService loadManagerExecutor;
 
+    private final UnloadManager unloadManager;
+
     private final LoadManagerContext context;
 
     private final ServiceUnitStateChannel channel;
@@ -57,13 +60,16 @@ public class UnloadScheduler implements 
LoadManagerScheduler {
     private volatile CompletableFuture<Void> currentRunningFuture = null;
 
     public UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+                           UnloadManager unloadManager,
                            LoadManagerContext context,
                            ServiceUnitStateChannel channel) {
-        this(loadManagerExecutor, context, channel, 
createNamespaceUnloadStrategy(context.brokerConfiguration()));
+        this(loadManagerExecutor, unloadManager, context,
+                channel, 
createNamespaceUnloadStrategy(context.brokerConfiguration()));
     }
 
     @VisibleForTesting
     protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+                              UnloadManager unloadManager,
                               LoadManagerContext context,
                               ServiceUnitStateChannel channel,
                               NamespaceUnloadStrategy strategy) {
@@ -71,6 +77,7 @@ public class UnloadScheduler implements LoadManagerScheduler {
         this.recentlyUnloadedBundles = new HashMap<>();
         this.recentlyUnloadedBrokers = new HashMap<>();
         this.loadManagerExecutor = loadManagerExecutor;
+        this.unloadManager = unloadManager;
         this.context = context;
         this.conf = context.brokerConfiguration();
         this.channel = channel;
@@ -131,9 +138,11 @@ public class UnloadScheduler implements 
LoadManagerScheduler {
                 List<CompletableFuture<Void>> futures = new ArrayList<>();
                 unloadDecision.getUnloads().forEach((broker, unload) -> {
                     log.info("[{}] Unloading bundle: {}", 
namespaceUnloadStrategy.getClass().getSimpleName(), unload);
-                    
futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
-                        recentlyUnloadedBundles.put(unload.serviceUnit(), 
System.currentTimeMillis());
-                        recentlyUnloadedBrokers.put(unload.sourceBroker(), 
System.currentTimeMillis());
+                    
futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload), 
unload.serviceUnit(),
+                                    
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS)
+                            .thenAccept(__ -> {
+                                
recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
+                                
recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis());
                     }));
                 });
                 return FutureUtil.waitForAll(futures).exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 33b15926c3c..899539e1db6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -717,6 +717,14 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle 
bundle) {
+        return unloadNamespaceBundle(bundle, Optional.empty());
+    }
+
+    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle 
bundle, Optional<String> destinationBroker) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            return ExtensibleLoadManagerImpl.get(loadManager.get())
+                    .unloadNamespaceBundleAsync(bundle, destinationBroker);
+        }
         // unload namespace bundle
         return unloadNamespaceBundle(bundle, 
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5484a70e1aa..321a127ad97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.BookieResources;
@@ -626,14 +627,18 @@ public abstract class PulsarWebResource {
         }
     }
 
-    protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleOwnershipAsync(NamespaceName fqnn,
-              BundlesData bundles, String bundleRange, boolean authoritative, 
boolean readOnly) {
+    protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleOwnershipAsync(
+            NamespaceName fqnn, BundlesData bundles, String bundleRange,
+            boolean authoritative, boolean readOnly) {
         NamespaceBundle nsBundle;
         try {
             nsBundle = validateNamespaceBundleRange(fqnn, bundles, 
bundleRange);
         } catch (WebApplicationException wae) {
             return CompletableFuture.failedFuture(wae);
         }
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) 
{
+            return CompletableFuture.completedFuture(nsBundle);
+        }
         return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
                 .thenApply(__ -> nsBundle);
     }
@@ -992,6 +997,10 @@ public abstract class PulsarWebResource {
     }
 
     protected static boolean isLeaderBroker(PulsarService pulsar) {
+        // For extensible load manager, it doesn't have leader election 
service on pulsar broker.
+        if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+            return true;
+        }
         return  pulsar.getLeaderElectionService().isLeader();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d1bf29725d0..ec82f5c383e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -43,10 +43,12 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
 import java.util.LinkedHashMap;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -77,18 +79,22 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
+import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TenantResources;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -124,10 +130,15 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     public void setup() throws Exception {
         conf.setAllowAutoTopicCreation(true);
         
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+        conf.setLoadBalancerSheddingEnabled(false);
         super.internalSetup(conf);
         pulsar1 = pulsar;
         ServiceConfiguration defaultConf = getDefaultConf();
+        defaultConf.setAllowAutoTopicCreation(true);
         
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+        defaultConf.setLoadBalancerSheddingEnabled(false);
         additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
         pulsar2 = additionalPulsarTestContext.getPulsarService();
 
@@ -148,6 +159,14 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         channel2 = (ServiceUnitStateChannelImpl)
                 FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
 
+        admin.clusters().createCluster(this.conf.getClusterName(),
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+                        Sets.newHashSet(this.conf.getClusterName())));
+        admin.namespaces().createNamespace("public/default");
+        admin.namespaces().setNamespaceReplicationClusters("public/default",
+                Sets.newHashSet(this.conf.getClusterName()));
     }
 
     protected void beforePulsarStart(PulsarService pulsar) throws Exception {
@@ -307,6 +326,59 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadAdminAPI() throws Exception {
+        TopicName topicName = TopicName.get("test-unload");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        checkOwnershipState(broker, bundle);
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
+        assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+        assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        String dstBrokerUrl = pulsar1.getLookupServiceAddress();
+        String dstBrokerServiceUrl;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            dstBrokerUrl = pulsar2.getLookupServiceAddress();
+            dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
+        } else {
+            dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
+        }
+        checkOwnershipState(broker, bundle);
+
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange(), dstBrokerUrl);
+
+        assertEquals(admin.lookups().lookupTopic(topicName.toString()), 
dstBrokerServiceUrl);
+
+        // Test transfer to current broker.
+        try {
+            admin.namespaces()
+                    .unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange(), dstBrokerUrl);
+            fail();
+        } catch (PulsarAdminException ex) {
+            assertTrue(ex.getMessage().contains("cannot be transfer to same 
broker"));
+        }
+    }
+
+    private void checkOwnershipState(String broker, NamespaceBundle bundle)
+            throws ExecutionException, InterruptedException {
+        var targetLoadManager = secondaryLoadManager;
+        var otherLoadManager = primaryLoadManager;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            targetLoadManager = primaryLoadManager;
+            otherLoadManager = secondaryLoadManager;
+        }
+        assertTrue(targetLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+        assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+    }
+
+
     @Test
     public void testGetMetrics() throws Exception {
         {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
new file mode 100644
index 00000000000..75ef913b8a8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class UnloadManagerTest {
+
+    @Test
+    public void testEventPubFutureHasException() {
+        UnloadManager manager = new UnloadManager();
+        CompletableFuture<Void> future =
+                manager.waitAsync(FutureUtil.failedFuture(new 
Exception("test")),
+                        "bundle-1", 10, TimeUnit.SECONDS);
+
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertEquals(ex.getCause().getMessage(), "test");
+        }
+    }
+
+    @Test
+    public void testTimeout() throws IllegalAccessException {
+        UnloadManager manager = new UnloadManager();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        "bundle-1", 3, TimeUnit.SECONDS);
+        Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = 
getInFlightUnloadRequestMap(manager);
+
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getCause() instanceof TimeoutException);
+        }
+
+        assertEquals(inFlightUnloadRequestMap.size(), 0);
+    }
+
+    @Test
+    public void testSuccess() throws IllegalAccessException, 
ExecutionException, InterruptedException {
+        UnloadManager manager = new UnloadManager();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        "bundle-1", 5, TimeUnit.SECONDS);
+        Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = 
getInFlightUnloadRequestMap(manager);
+
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Assigning, 
"broker-1", VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", 
VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Splitting, 
"broker-1", VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Releasing, 
"broker-1", VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", 
VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", 
VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 0);
+        future.get();
+
+        // Success with Owned state.
+        future = manager.waitAsync(CompletableFuture.completedFuture(null),
+                "bundle-1", 5, TimeUnit.SECONDS);
+        inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", 
VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequestMap.size(), 0);
+        future.get();
+    }
+
+    @Test
+    public void testFailedStage() throws IllegalAccessException {
+        UnloadManager manager = new UnloadManager();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        "bundle-1", 5, TimeUnit.SECONDS);
+        Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = 
getInFlightUnloadRequestMap(manager);
+
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+        manager.handleEvent("bundle-1",
+                new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", 
VERSION_ID_INIT),
+                new IllegalStateException("Failed stage."));
+
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getCause() instanceof IllegalStateException);
+            assertEquals(ex.getCause().getMessage(), "Failed stage.");
+        }
+
+        assertEquals(inFlightUnloadRequestMap.size(), 0);
+    }
+
+    @Test
+    public void testClose() throws IllegalAccessException {
+        UnloadManager manager = new UnloadManager();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        "bundle-1", 5, TimeUnit.SECONDS);
+        Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = 
getInFlightUnloadRequestMap(manager);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
+        manager.close();
+        assertEquals(inFlightUnloadRequestMap.size(), 0);
+
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getCause() instanceof IllegalStateException);
+        }
+    }
+
+    private Map<String, CompletableFuture<Void>> 
getInFlightUnloadRequestMap(UnloadManager manager)
+            throws IllegalAccessException {
+        Map<String, CompletableFuture<Void>> inFlightUnloadRequest =
+                (Map<String, CompletableFuture<Void>>) 
FieldUtils.readField(manager, "inFlightUnloadRequest", true);
+
+        return inFlightUnloadRequest;
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
index cda5f81d81b..73d4eb1f18b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -31,6 +32,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 import org.apache.pulsar.client.util.ExecutorProvider;
@@ -71,17 +73,20 @@ public class UnloadSchedulerTest {
         LoadManagerContext context = setupContext();
         BrokerRegistry registry = context.brokerRegistry();
         ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+        UnloadManager unloadManager = mock(UnloadManager.class);
         NamespaceUnloadStrategy unloadStrategy = 
mock(NamespaceUnloadStrategy.class);
         
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
         
doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", 
"broker-2")))
                 .when(registry).getAvailableBrokersAsync();
         
doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any());
+        doReturn(CompletableFuture.completedFuture(null)).when(unloadManager)
+                .waitAsync(any(), any(), anyLong(), any());
         UnloadDecision decision = new UnloadDecision();
         Unload unload = new Unload("broker-1", "bundle-1");
         decision.getUnloads().put("broker-1", unload);
         doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), 
any(), any());
 
-        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
context, channel, unloadStrategy);
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
unloadManager, context, channel, unloadStrategy);
 
         scheduler.execute();
 
@@ -101,6 +106,7 @@ public class UnloadSchedulerTest {
         LoadManagerContext context = setupContext();
         BrokerRegistry registry = context.brokerRegistry();
         ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+        UnloadManager unloadManager = mock(UnloadManager.class);
         NamespaceUnloadStrategy unloadStrategy = 
mock(NamespaceUnloadStrategy.class);
         
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
         doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
@@ -112,7 +118,7 @@ public class UnloadSchedulerTest {
                 }
                 return Lists.newArrayList("broker-1", "broker-2");
             }, 
Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
-        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
context, channel, unloadStrategy);
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
unloadManager, context, channel, unloadStrategy);
 
         ExecutorService executorService = Executors.newFixedThreadPool(10);
         CountDownLatch latch = new CountDownLatch(10);
@@ -133,7 +139,8 @@ public class UnloadSchedulerTest {
         context.brokerConfiguration().setLoadBalancerEnabled(false);
         ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
         NamespaceUnloadStrategy unloadStrategy = 
mock(NamespaceUnloadStrategy.class);
-        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
context, channel, unloadStrategy);
+        UnloadManager unloadManager = mock(UnloadManager.class);
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
unloadManager, context, channel, unloadStrategy);
 
         scheduler.execute();
 
@@ -152,7 +159,8 @@ public class UnloadSchedulerTest {
         context.brokerConfiguration().setLoadBalancerEnabled(false);
         ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
         NamespaceUnloadStrategy unloadStrategy = 
mock(NamespaceUnloadStrategy.class);
-        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
context, channel, unloadStrategy);
+        UnloadManager unloadManager = mock(UnloadManager.class);
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, 
unloadManager, context, channel, unloadStrategy);
         
doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync();
 
         scheduler.execute();

Reply via email to