heesung-sn commented on code in PR #19538:
URL: https://github.com/apache/pulsar/pull/19538#discussion_r1117968371


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -118,6 +119,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final LeaderElectionService leaderElectionService;
     private BrokerSelectionStrategy brokerSelector;
     private BrokerRegistry brokerRegistry;
+    private UnloadManager unloadManager;

Review Comment:
   I thought about this dependency, and actually, I think we need the observer 
pattern here.
   
   Conceptually, UnloadManager and other managers subscribe to bundle state 
change events.
   
   `stateChangeListeners <ServiceUnitState, List<func(serviceunit, 
serviceUnitStateData)>>`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -717,6 +721,33 @@ private Optional<Pair<String, String>> 
getLeastLoadedFromLoadManager(ServiceUnit
     }
 
     public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle 
bundle) {
+        return unloadNamespaceBundle(bundle, null);
+    }
+
+    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle 
bundle, @Nullable String destinationBroker) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            ExtensibleLoadManagerWrapper loadManagerWrapper = 
(ExtensibleLoadManagerWrapper) loadManager.get();
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
loadManagerWrapper.get();

Review Comment:
   Could we have a static method `ExtensibleLoadManagerImpl 
ExtensibleLoadManagerImpl.get(LoadManager loadmanager)` to isolate this getter 
logic?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java:
##########
@@ -150,6 +151,18 @@ public interface ServiceUnitStateChannel extends Closeable 
{
      */
     CompletableFuture<Void> publishSplitEventAsync(Split split);
 
+    /**
+     * Asynchronously publishes the service unit unload event to the system 
topic in this channel,
+     * and wait to unload operation complete.
+     *
+     * @param unload (unload specification object)
+     * @param timeout The unload operation timeout.
+     * @param timeoutUnit The unload operation timeout unit.
+     * @return the completable future object staged from the event message 
sendAsync.
+     */
+    CompletableFuture<Void> publishUnloadEventAndWaitUnloadComplete(

Review Comment:
   I don't think SUSC is responsible for waiting for the completion of 
events(too much coupling), so the waiting should be handled by `UnloadManager` 
outside of SUSC.
   
   Can we pass the pub future to UnloadManager.handleUnload ? (I propose 
renaming `handleUnload` method to `waitAsync` too)
   ```
   CompletedFuture<Void>  waitAsync(CompletedFuture<Void> unloadEventPubFuture, 
timeout, timeoutUnit){
   
   return eventPubFuture.thenCompose(_ -> { 
   inFlightUnloadRequest.computeIfAbsent(bundle, __ -> {
               if (log.isDebugEnabled()) {
                   log.debug("Handle unload bundle: {}, timeout: {} {}", 
bundle, timeout, timeoutUnit);
    ...
           });
   
   }
   }
   
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+
+@Slf4j
+public class UnloadManager {
+
+    private final PulsarService pulsar;
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    public void completeUnload(String serviceUnit) {

Review Comment:
   Unload suffix looks redundant.
   
   `complete(..`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+
+@Slf4j
+public class UnloadManager {
+
+    private final PulsarService pulsar;
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    public void completeUnload(String serviceUnit) {
+        inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {

Review Comment:
   shouldn't we remove the request and then make it complete?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+
+@Slf4j
+public class UnloadManager {
+
+    private final PulsarService pulsar;
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    public void completeUnload(String serviceUnit) {
+        inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
+            if (!future.isDone()) {
+                future.complete(null);
+                if (log.isDebugEnabled()) {
+                    log.debug("Complete unload bundle: {}", serviceUnit);
+                }
+            }
+            return null;
+        });
+    }
+
+    public CompletableFuture<Void> handleUnload(String bundle, long timeout, 
TimeUnit timeoutUnit) {
+        return inFlightUnloadRequest.computeIfAbsent(bundle, __ -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Handle unload bundle: {}, timeout: {} {}", bundle, 
timeout, timeoutUnit);
+            }
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            ScheduledFuture<?> taskTimeout = pulsar.getExecutor().schedule(() 
-> {
+                if (!future.isDone()) {
+                    String msg = String.format("Unloading bundle: %s has timed 
out, cancel the future.", bundle);
+                    log.warn(msg);
+                    // Complete the future with error
+                    future.completeExceptionally(new TimeoutException(msg));

Review Comment:
   we need to remove this future from the inflight map.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+
+@Slf4j
+public class UnloadManager {
+
+    private final PulsarService pulsar;
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    public void completeUnload(String serviceUnit) {
+        inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
+            if (!future.isDone()) {
+                future.complete(null);
+                if (log.isDebugEnabled()) {
+                    log.debug("Complete unload bundle: {}", serviceUnit);
+                }
+            }
+            return null;
+        });
+    }
+
+    public CompletableFuture<Void> handleUnload(String bundle, long timeout, 
TimeUnit timeoutUnit) {

Review Comment:
   As discussed above, I think `waitAsync` is the better name.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java:
##########
@@ -992,6 +1004,10 @@ protected boolean isLeaderBroker() {
     }
 
     protected static boolean isLeaderBroker(PulsarService pulsar) {
+        // For extensible load manager, it doesn't have leader election 
service on pulsar broker.
+        if (pulsar.getLeaderElectionService() == null) {

Review Comment:
   Can we not pass this authoritative flag when using the LM extension?
   
   We do have the leader broker in BSC, but here we always return false from 
the LM extension, which is counter-intuitive.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -307,6 +325,53 @@ public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> broker
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadAdminAPI() throws Exception {
+        TopicName topicName = TopicName.get("test-unload");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        var loadManager = secondaryLoadManager;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            loadManager = primaryLoadManager;
+        }
+
+        assertTrue(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
+        assertFalse(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());

Review Comment:
   can we assert both primary and secondary load manager?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -307,6 +325,53 @@ public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> broker
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadAdminAPI() throws Exception {
+        TopicName topicName = TopicName.get("test-unload");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        var loadManager = secondaryLoadManager;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            loadManager = primaryLoadManager;
+        }
+
+        assertTrue(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
+        assertFalse(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+        loadManager = secondaryLoadManager;
+
+        String othersBrokerUrl = pulsar1.getLookupServiceAddress();
+        String othersBrokerServiceUrl;

Review Comment:
   dstBrokerServiceUrl



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -307,6 +325,53 @@ public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> broker
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadAdminAPI() throws Exception {
+        TopicName topicName = TopicName.get("test-unload");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        var loadManager = secondaryLoadManager;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            loadManager = primaryLoadManager;
+        }
+
+        assertTrue(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
+        assertFalse(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+        loadManager = secondaryLoadManager;
+
+        String othersBrokerUrl = pulsar1.getLookupServiceAddress();
+        String othersBrokerServiceUrl;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            loadManager = primaryLoadManager;
+            othersBrokerUrl = pulsar2.getLookupServiceAddress();
+            othersBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
+        } else {
+            othersBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
+        }
+        assertTrue(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());

Review Comment:
   can we assert both primary and secondary load manager?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -717,6 +721,33 @@ private Optional<Pair<String, String>> 
getLeastLoadedFromLoadManager(ServiceUnit
     }
 
     public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle 
bundle) {
+        return unloadNamespaceBundle(bundle, null);
+    }
+
+    public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle 
bundle, @Nullable String destinationBroker) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            ExtensibleLoadManagerWrapper loadManagerWrapper = 
(ExtensibleLoadManagerWrapper) loadManager.get();
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
loadManagerWrapper.get();
+            ServiceUnitStateChannel channel = 
extensibleLoadManager.getServiceUnitStateChannel();
+            return extensibleLoadManager.getOwnershipAsync(Optional.empty(), 
bundle)

Review Comment:
   Can we define a method `CompletableFuture<Void> unloadNamespaceBundle(..)` 
in `ExtensibleLoadManager` and move this logic there?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+
+@Slf4j
+public class UnloadManager {
+
+    private final PulsarService pulsar;
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    public void completeUnload(String serviceUnit) {
+        inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
+            if (!future.isDone()) {
+                future.complete(null);
+                if (log.isDebugEnabled()) {
+                    log.debug("Complete unload bundle: {}", serviceUnit);
+                }
+            }
+            return null;
+        });
+    }
+
+    public CompletableFuture<Void> handleUnload(String bundle, long timeout, 
TimeUnit timeoutUnit) {
+        return inFlightUnloadRequest.computeIfAbsent(bundle, __ -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Handle unload bundle: {}, timeout: {} {}", bundle, 
timeout, timeoutUnit);
+            }
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            ScheduledFuture<?> taskTimeout = pulsar.getExecutor().schedule(() 
-> {

Review Comment:
   Can we use `future.orTimeout` for this waiting logic and remove this unload 
if timeout?
   
   we have a similar pattern in the SUSC.
   ```
    future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
                               .whenComplete((v, e) -> {
                                           if (e != null) {
                                               
getOwnerRequests.remove(serviceUnit, future);
                                               log.warn("Failed to getOwner for 
serviceUnit:{}",
                                                       serviceUnit, e);
                                           }
                                       }
                               );
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+
+@Slf4j
+public class UnloadManager {
+
+    private final PulsarService pulsar;
+
+    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+    public UnloadManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+    }
+
+    public void completeUnload(String serviceUnit) {

Review Comment:
   Plz consider updating the UnloadCounter as well if you can update them in 
this PR. I am fine with another PR with this.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java:
##########
@@ -131,9 +131,11 @@ public synchronized void execute() {
                 List<CompletableFuture<Void>> futures = new ArrayList<>();
                 unloadDecision.getUnloads().forEach((broker, unload) -> {
                     log.info("[{}] Unloading bundle: {}", 
namespaceUnloadStrategy.getClass().getSimpleName(), unload);
-                    
futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
-                        recentlyUnloadedBundles.put(unload.serviceUnit(), 
System.currentTimeMillis());
-                        recentlyUnloadedBrokers.put(unload.sourceBroker(), 
System.currentTimeMillis());
+                    
futures.add(channel.publishUnloadEventAndWaitUnloadComplete(
+                            unload, 
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS)
+                            .thenAccept(__ -> {
+                                
recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
+                                
recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis());

Review Comment:
   Please consider UnloadMetrics if you can update it in this PR. I am fine 
with another PR for this.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java:
##########
@@ -626,15 +627,19 @@ protected NamespaceBundle 
validateNamespaceBundleOwnership(NamespaceName fqnn, B
         }
     }
 
-    protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleOwnershipAsync(NamespaceName fqnn,
-              BundlesData bundles, String bundleRange, boolean authoritative, 
boolean readOnly) {
+    protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleOwnershipAsync(
+            NamespaceName fqnn, BundlesData bundles, String bundleRange,
+            boolean authoritative, boolean readOnly, String destinationBroker) 
{

Review Comment:
   `Optional<String> destinationBroker`



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java:
##########
@@ -307,6 +325,53 @@ public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> broker
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadAdminAPI() throws Exception {
+        TopicName topicName = TopicName.get("test-unload");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        var loadManager = secondaryLoadManager;
+        if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+            loadManager = primaryLoadManager;
+        }
+
+        assertTrue(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
+        assertFalse(loadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+        loadManager = secondaryLoadManager;
+
+        String othersBrokerUrl = pulsar1.getLookupServiceAddress();

Review Comment:
   dstBrokerUrl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to