This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b2658af8475 [improve][broker] PIP-192: Make unload and transfer admin
API functional (#19538)
b2658af8475 is described below
commit b2658af847511fef29455a22e96baa1cdc9d278f
Author: Kai Wang <[email protected]>
AuthorDate: Tue Mar 7 16:21:57 2023 +0800
[improve][broker] PIP-192: Make unload and transfer admin API functional
(#19538)
PIP-192: https://github.com/apache/pulsar/issues/16691
### Motivation
Currently, the unload and transfer admin API still uses the old logic when
enabling the `ExtensibleLoadManager`,
we need to publish the message to `ServiceUnitStateChannel`, and wait for
its response.
### Modifications
This PR added a `UnloadManager ` to handle the duplicate unload request and
return a `CompletableFuture` to the caller, so we can know when the unload
operation is finished.
The unload and transfer admin API also been fixed, now we can support do
unload when using `ExtensibleLoadManager`.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 17 +-
.../extensions/ExtensibleLoadManagerImpl.java | 53 +++++-
.../channel/ServiceUnitStateChannel.java | 8 +
.../channel/ServiceUnitStateChannelImpl.java | 24 ++-
.../extensions/channel/StateChangeListeners.java | 67 ++++++++
.../extensions/manager/StateChangeListener.java | 33 ++++
.../extensions/manager/UnloadManager.java | 103 ++++++++++++
.../extensions/manager/package-info.java | 19 +++
.../extensions/scheduler/UnloadScheduler.java | 17 +-
.../pulsar/broker/namespace/NamespaceService.java | 8 +
.../pulsar/broker/web/PulsarWebResource.java | 13 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 72 +++++++++
.../extensions/manager/UnloadManagerTest.java | 178 +++++++++++++++++++++
.../extensions/scheduler/UnloadSchedulerTest.java | 16 +-
14 files changed, 609 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 30f01ece7de..5be675f7b63 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
@@ -983,8 +984,17 @@ public abstract class NamespacesBase extends AdminResource
{
}
return CompletableFuture.completedFuture(null);
})
- .thenCompose(__ -> validateLeaderBrokerAsync())
+ .thenCompose(__ -> {
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return validateLeaderBrokerAsync();
+ })
.thenAccept(__ -> {
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ return;
+ }
+ // For ExtensibleLoadManager, this operation will be
ignored.
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange,
destinationBroker);
});
}
@@ -1036,10 +1046,11 @@ public abstract class NamespacesBase extends
AdminResource {
namespaceName, bundleRange);
return CompletableFuture.completedFuture(null);
}
+ Optional<String> destinationBrokerOpt =
Optional.ofNullable(destinationBroker);
return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
authoritative, true)
- .thenCompose(nsBundle ->
-
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle));
+ .thenCompose(nsBundle ->
pulsar().getNamespaceService()
+ .unloadNamespaceBundle(nsBundle,
destinationBrokerOpt));
}));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 650c12af573..2bebe203d87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
@@ -43,9 +44,11 @@ import
org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
@@ -110,6 +113,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
private ScheduledFuture brokerLoadDataReportTask;
private ScheduledFuture topBundlesLoadDataReportTask;
+ private UnloadManager unloadManager;
+
private boolean started = false;
private final AssignCounter assignCounter = new AssignCounter();
@@ -143,6 +148,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return
ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}
+ public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
+ if (!(loadManager instanceof ExtensibleLoadManagerWrapper
loadManagerWrapper)) {
+ throw new IllegalArgumentException("The load manager should be
'ExtensibleLoadManagerWrapper'.");
+ }
+ return loadManagerWrapper.get();
+ }
+
@Override
public void start() throws PulsarServerException {
if (this.started) {
@@ -151,6 +163,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
+ this.unloadManager = new UnloadManager();
+ this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.start();
try {
@@ -201,7 +215,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
interval, TimeUnit.MILLISECONDS);
// TODO: Start bundle split scheduler.
- this.unloadScheduler = new
UnloadScheduler(pulsar.getLoadManagerExecutor(), context,
serviceUnitStateChannel);
+ this.unloadScheduler = new UnloadScheduler(
+ pulsar.getLoadManagerExecutor(), unloadManager, context,
serviceUnitStateChannel);
this.unloadScheduler.start();
this.started = true;
}
@@ -300,6 +315,12 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
@Override
public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
+ return getOwnershipAsync(topic, bundleUnit)
+ .thenApply(broker ->
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+ }
+
+ private CompletableFuture<Optional<String>>
getOwnershipAsync(Optional<ServiceUnitId> topic,
+ ServiceUnitId
bundleUnit) {
final String bundle = bundleUnit.toString();
CompletableFuture<Optional<String>> owner;
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
@@ -307,8 +328,35 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle);
}
+ return owner;
+ }
+
+ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId
bundle,
+ Optional<String>
destinationBroker) {
+ return getOwnershipAsync(Optional.empty(), bundle)
+ .thenCompose(brokerOpt -> {
+ if (brokerOpt.isEmpty()) {
+ String msg = String.format("Namespace bundle: %s is
not owned by any broker.", bundle);
+ log.warn(msg);
+ throw new IllegalStateException(msg);
+ }
+ String sourceBroker = brokerOpt.get();
+ if (destinationBroker.isPresent() &&
sourceBroker.endsWith(destinationBroker.get())) {
+ String msg = String.format("Namespace bundle: %s own
by %s, cannot be transfer to same broker.",
+ bundle, sourceBroker);
+ log.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ return unloadAsync(new Unload(sourceBroker,
bundle.toString(), destinationBroker),
+ conf.getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS);
+ });
+ }
- return owner.thenApply(broker ->
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+ private CompletableFuture<Void> unloadAsync(Unload unload,
+ long timeout,
+ TimeUnit timeoutUnit) {
+ CompletableFuture<Void> future =
serviceUnitStateChannel.publishUnloadEventAsync(unload);
+ return unloadManager.waitAsync(future, unload.serviceUnit(), timeout,
timeoutUnit);
}
@Override
@@ -337,6 +385,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
try {
this.serviceUnitStateChannel.close();
} finally {
+ this.unloadManager.close();
this.started = false;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 44950a21ffd..dc4d582ddb0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
+import
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.common.stats.Metrics;
@@ -156,4 +157,11 @@ public interface ServiceUnitStateChannel extends Closeable
{
*/
List<Metrics> getMetrics();
+ /**
+ * Add a state change listener.
+ *
+ * @param listener State change listener.
+ */
+ void listen(StateChangeListener listener);
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index cddaf92d227..5f24e41dda9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -72,6 +72,7 @@ import
org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
@@ -117,6 +118,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private final ConcurrentOpenHashMap<String, CompletableFuture<String>>
getOwnerRequests;
private final String lookupServiceAddress;
private final ConcurrentOpenHashMap<String, CompletableFuture<Void>>
cleanupJobs;
+ private final StateChangeListeners stateChangeListeners;
private final LeaderElectionService leaderElectionService;
private BrokerSelectionStrategy brokerSelector;
private BrokerRegistry brokerRegistry;
@@ -191,6 +193,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
this.getOwnerRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<String>>newBuilder().build();
this.cleanupJobs = ConcurrentOpenHashMap.<String,
CompletableFuture<Void>>newBuilder().build();
+ this.stateChangeListeners = new StateChangeListeners();
this.semiTerminalStateWaitingTimeInMillis =
config.getLoadBalancerServiceUnitStateCleanUpDelayTimeInSeconds()
* 1000;
this.inFlightStateWaitingTimeInMillis =
MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
@@ -357,6 +360,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.info("Successfully cancelled the cleanup tasks");
}
+ if (stateChangeListeners != null) {
+ stateChangeListeners.close();
+ }
+
log.info("Successfully closed the channel.");
} catch (Exception e) {
@@ -619,6 +626,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
+ stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
}
@@ -628,7 +636,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (isTargetBroker(data.dstBroker())) {
ServiceUnitStateData next = new ServiceUnitStateData(
Owned, data.dstBroker(), data.sourceBroker(),
getNextVersionId(data));
- pubAsync(serviceUnit, next)
+ stateChangeListeners.notifyOnCompletion(pubAsync(serviceUnit,
next), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
}
@@ -644,15 +652,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(),
getNextVersionId(data));
}
- closeServiceUnit(serviceUnit)
- .thenCompose(__ -> pubAsync(serviceUnit, next))
+
stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit)
+ .thenCompose(__ -> pubAsync(serviceUnit, next)),
serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
}
private void handleSplitEvent(String serviceUnit, ServiceUnitStateData
data) {
if (isTargetBroker(data.sourceBroker())) {
- splitServiceUnit(serviceUnit, data)
+
stateChangeListeners.notifyOnCompletion(splitServiceUnit(serviceUnit, data),
serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
}
}
@@ -662,6 +670,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
+ stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
}
@@ -672,6 +681,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new
IllegalStateException(serviceUnit + "has been deleted."));
}
+ stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
}
@@ -682,6 +692,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
+ stateChangeListeners.notify(serviceUnit, null, null);
log(null, serviceUnit, null, null);
}
@@ -1302,4 +1313,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return metrics;
}
+
+ @Override
+ public void listen(StateChangeListener listener) {
+ this.stateChangeListeners.addListener(listener);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
new file mode 100644
index 00000000000..1d396f500b6
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.channel;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
+
+@Slf4j
+public class StateChangeListeners {
+
+ private final List<StateChangeListener> stateChangeListeners;
+
+ public StateChangeListeners() {
+ stateChangeListeners = new CopyOnWriteArrayList<>();
+ }
+
+ public void addListener(StateChangeListener listener) {
+ Objects.requireNonNull(listener);
+ stateChangeListeners.add(listener);
+ }
+
+ public void close() {
+ this.stateChangeListeners.clear();
+ }
+
+ /**
+ * Notify all currently added listeners on completion of the future.
+ *
+ * @return future of a new completion stage
+ */
+ public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T>
future,
+ String serviceUnit,
+ ServiceUnitStateData
data) {
+ return future.whenComplete((r, ex) -> notify(serviceUnit, data, ex));
+ }
+
+ public void notify(String serviceUnit, ServiceUnitStateData data,
Throwable t) {
+ stateChangeListeners.forEach(listener -> {
+ try {
+ listener.handleEvent(serviceUnit, data, t);
+ } catch (Throwable ex) {
+ log.error("StateChangeListener: {} exception while handling {}
for service unit {}",
+ listener, data, serviceUnit, ex);
+ }
+ });
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
new file mode 100644
index 00000000000..7ba8be8771b
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+
+public interface StateChangeListener {
+
+ /**
+ * Handle the service unit state change.
+ *
+ * @param serviceUnit - Service Unit(Namespace bundle).
+ * @param data - Service unit state data.
+ * @param t - Exception, if present.
+ */
+ void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
t);
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
new file mode 100644
index 00000000000..ead6384daba
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+
+/**
+ * Unload manager.
+ */
+@Slf4j
+public class UnloadManager implements StateChangeListener {
+
+ private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+
+ public UnloadManager() {
+ this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+ }
+
+ private void complete(String serviceUnit, Throwable ex) {
+ inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
+ if (!future.isDone()) {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ if (log.isDebugEnabled()) {
+ log.debug("Complete exceptionally unload bundle: {}",
serviceUnit, ex);
+ }
+ } else {
+ future.complete(null);
+ if (log.isDebugEnabled()) {
+ log.debug("Complete unload bundle: {}", serviceUnit);
+ }
+ }
+ }
+ return null;
+ });
+ }
+
+ public CompletableFuture<Void> waitAsync(CompletableFuture<Void>
eventPubFuture,
+ String bundle,
+ long timeout,
+ TimeUnit timeoutUnit) {
+
+ return eventPubFuture.thenCompose(__ ->
inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Handle unload bundle: {}, timeout: {} {}", bundle,
timeout, timeoutUnit);
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
+ if (ex != null) {
+ inFlightUnloadRequest.remove(bundle);
+ log.warn("Failed to wait unload for serviceUnit: {}",
bundle, ex);
+ }
+ });
+ return future;
+ }));
+ }
+
+ @Override
+ public void handleEvent(String serviceUnit, ServiceUnitStateData data,
Throwable t) {
+ ServiceUnitState state = ServiceUnitStateData.state(data);
+ switch (state) {
+ case Free, Owned -> this.complete(serviceUnit, t);
+ default -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {}", data,
serviceUnit);
+ }
+ }
+ }
+ }
+
+ public void close() {
+ inFlightUnloadRequest.forEach((bundle, future) -> {
+ if (!future.isDone()) {
+ String msg = String.format("Unloading bundle: %s, but the
unload manager already closed.", bundle);
+ log.warn(msg);
+ future.completeExceptionally(new IllegalStateException(msg));
+ }
+ });
+ inFlightUnloadRequest.clear();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java
new file mode 100644
index 00000000000..ac553c06900
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
index 5cdbd302710..e6460269787 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
@@ -42,6 +43,8 @@ public class UnloadScheduler implements LoadManagerScheduler {
private final ScheduledExecutorService loadManagerExecutor;
+ private final UnloadManager unloadManager;
+
private final LoadManagerContext context;
private final ServiceUnitStateChannel channel;
@@ -57,13 +60,16 @@ public class UnloadScheduler implements
LoadManagerScheduler {
private volatile CompletableFuture<Void> currentRunningFuture = null;
public UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+ UnloadManager unloadManager,
LoadManagerContext context,
ServiceUnitStateChannel channel) {
- this(loadManagerExecutor, context, channel,
createNamespaceUnloadStrategy(context.brokerConfiguration()));
+ this(loadManagerExecutor, unloadManager, context,
+ channel,
createNamespaceUnloadStrategy(context.brokerConfiguration()));
}
@VisibleForTesting
protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+ UnloadManager unloadManager,
LoadManagerContext context,
ServiceUnitStateChannel channel,
NamespaceUnloadStrategy strategy) {
@@ -71,6 +77,7 @@ public class UnloadScheduler implements LoadManagerScheduler {
this.recentlyUnloadedBundles = new HashMap<>();
this.recentlyUnloadedBrokers = new HashMap<>();
this.loadManagerExecutor = loadManagerExecutor;
+ this.unloadManager = unloadManager;
this.context = context;
this.conf = context.brokerConfiguration();
this.channel = channel;
@@ -131,9 +138,11 @@ public class UnloadScheduler implements
LoadManagerScheduler {
List<CompletableFuture<Void>> futures = new ArrayList<>();
unloadDecision.getUnloads().forEach((broker, unload) -> {
log.info("[{}] Unloading bundle: {}",
namespaceUnloadStrategy.getClass().getSimpleName(), unload);
-
futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
- recentlyUnloadedBundles.put(unload.serviceUnit(),
System.currentTimeMillis());
- recentlyUnloadedBrokers.put(unload.sourceBroker(),
System.currentTimeMillis());
+
futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload),
unload.serviceUnit(),
+
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS)
+ .thenAccept(__ -> {
+
recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
+
recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis());
}));
});
return FutureUtil.waitForAll(futures).exceptionally(ex -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 33b15926c3c..899539e1db6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -717,6 +717,14 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle
bundle) {
+ return unloadNamespaceBundle(bundle, Optional.empty());
+ }
+
+ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle
bundle, Optional<String> destinationBroker) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return ExtensibleLoadManagerImpl.get(loadManager.get())
+ .unloadNamespaceBundleAsync(bundle, destinationBroker);
+ }
// unload namespace bundle
return unloadNamespaceBundle(bundle,
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5484a70e1aa..321a127ad97 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BookieResources;
@@ -626,14 +627,18 @@ public abstract class PulsarWebResource {
}
}
- protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleOwnershipAsync(NamespaceName fqnn,
- BundlesData bundles, String bundleRange, boolean authoritative,
boolean readOnly) {
+ protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleOwnershipAsync(
+ NamespaceName fqnn, BundlesData bundles, String bundleRange,
+ boolean authoritative, boolean readOnly) {
NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
} catch (WebApplicationException wae) {
return CompletableFuture.failedFuture(wae);
}
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config()))
{
+ return CompletableFuture.completedFuture(nsBundle);
+ }
return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
.thenApply(__ -> nsBundle);
}
@@ -992,6 +997,10 @@ public abstract class PulsarWebResource {
}
protected static boolean isLeaderBroker(PulsarService pulsar) {
+ // For extensible load manager, it doesn't have leader election
service on pulsar broker.
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+ return true;
+ }
return pulsar.getLeaderElectionService().isLeader();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d1bf29725d0..ec82f5c383e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -43,10 +43,12 @@ import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.LinkedHashMap;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -77,18 +79,22 @@ import
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
+import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -124,10 +130,15 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
public void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
super.internalSetup(conf);
pulsar1 = pulsar;
ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
pulsar2 = additionalPulsarTestContext.getPulsarService();
@@ -148,6 +159,14 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
channel2 = (ServiceUnitStateChannelImpl)
FieldUtils.readField(secondaryLoadManager,
"serviceUnitStateChannel", true);
+ admin.clusters().createCluster(this.conf.getClusterName(),
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
}
protected void beforePulsarStart(PulsarService pulsar) throws Exception {
@@ -307,6 +326,59 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertTrue(brokerLookupData.isPresent());
}
+ @Test(timeOut = 30 * 1000)
+ public void testUnloadAdminAPI() throws Exception {
+ TopicName topicName = TopicName.get("test-unload");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+ String broker = admin.lookups().lookupTopic(topicName.toString());
+ log.info("Assign the bundle {} to {}", bundle, broker);
+
+ checkOwnershipState(broker, bundle);
+ admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange());
+ assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
+ broker = admin.lookups().lookupTopic(topicName.toString());
+ log.info("Assign the bundle {} to {}", bundle, broker);
+
+ String dstBrokerUrl = pulsar1.getLookupServiceAddress();
+ String dstBrokerServiceUrl;
+ if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+ dstBrokerUrl = pulsar2.getLookupServiceAddress();
+ dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
+ } else {
+ dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
+ }
+ checkOwnershipState(broker, bundle);
+
+ admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange(), dstBrokerUrl);
+
+ assertEquals(admin.lookups().lookupTopic(topicName.toString()),
dstBrokerServiceUrl);
+
+ // Test transfer to current broker.
+ try {
+ admin.namespaces()
+ .unloadNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange(), dstBrokerUrl);
+ fail();
+ } catch (PulsarAdminException ex) {
+ assertTrue(ex.getMessage().contains("cannot be transfer to same
broker"));
+ }
+ }
+
+ private void checkOwnershipState(String broker, NamespaceBundle bundle)
+ throws ExecutionException, InterruptedException {
+ var targetLoadManager = secondaryLoadManager;
+ var otherLoadManager = primaryLoadManager;
+ if (broker.equals(pulsar1.getBrokerServiceUrl())) {
+ targetLoadManager = primaryLoadManager;
+ otherLoadManager = secondaryLoadManager;
+ }
+ assertTrue(targetLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ }
+
+
@Test
public void testGetMetrics() throws Exception {
{
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
new file mode 100644
index 00000000000..75ef913b8a8
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class UnloadManagerTest {
+
+ @Test
+ public void testEventPubFutureHasException() {
+ UnloadManager manager = new UnloadManager();
+ CompletableFuture<Void> future =
+ manager.waitAsync(FutureUtil.failedFuture(new
Exception("test")),
+ "bundle-1", 10, TimeUnit.SECONDS);
+
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertEquals(ex.getCause().getMessage(), "test");
+ }
+ }
+
+ @Test
+ public void testTimeout() throws IllegalAccessException {
+ UnloadManager manager = new UnloadManager();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ "bundle-1", 3, TimeUnit.SECONDS);
+ Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap =
getInFlightUnloadRequestMap(manager);
+
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof TimeoutException);
+ }
+
+ assertEquals(inFlightUnloadRequestMap.size(), 0);
+ }
+
+ @Test
+ public void testSuccess() throws IllegalAccessException,
ExecutionException, InterruptedException {
+ UnloadManager manager = new UnloadManager();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ "bundle-1", 5, TimeUnit.SECONDS);
+ Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap =
getInFlightUnloadRequestMap(manager);
+
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Assigning,
"broker-1", VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1",
VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Splitting,
"broker-1", VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Releasing,
"broker-1", VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Init, "broker-1",
VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Free, "broker-1",
VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 0);
+ future.get();
+
+ // Success with Owned state.
+ future = manager.waitAsync(CompletableFuture.completedFuture(null),
+ "bundle-1", 5, TimeUnit.SECONDS);
+ inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1",
VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequestMap.size(), 0);
+ future.get();
+ }
+
+ @Test
+ public void testFailedStage() throws IllegalAccessException {
+ UnloadManager manager = new UnloadManager();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ "bundle-1", 5, TimeUnit.SECONDS);
+ Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap =
getInFlightUnloadRequestMap(manager);
+
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+
+ manager.handleEvent("bundle-1",
+ new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1",
VERSION_ID_INIT),
+ new IllegalStateException("Failed stage."));
+
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof IllegalStateException);
+ assertEquals(ex.getCause().getMessage(), "Failed stage.");
+ }
+
+ assertEquals(inFlightUnloadRequestMap.size(), 0);
+ }
+
+ @Test
+ public void testClose() throws IllegalAccessException {
+ UnloadManager manager = new UnloadManager();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ "bundle-1", 5, TimeUnit.SECONDS);
+ Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap =
getInFlightUnloadRequestMap(manager);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+ manager.close();
+ assertEquals(inFlightUnloadRequestMap.size(), 0);
+
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof IllegalStateException);
+ }
+ }
+
+ private Map<String, CompletableFuture<Void>>
getInFlightUnloadRequestMap(UnloadManager manager)
+ throws IllegalAccessException {
+ Map<String, CompletableFuture<Void>> inFlightUnloadRequest =
+ (Map<String, CompletableFuture<Void>>)
FieldUtils.readField(manager, "inFlightUnloadRequest", true);
+
+ return inFlightUnloadRequest;
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
index cda5f81d81b..73d4eb1f18b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -31,6 +32,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.client.util.ExecutorProvider;
@@ -71,17 +73,20 @@ public class UnloadSchedulerTest {
LoadManagerContext context = setupContext();
BrokerRegistry registry = context.brokerRegistry();
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+ UnloadManager unloadManager = mock(UnloadManager.class);
NamespaceUnloadStrategy unloadStrategy =
mock(NamespaceUnloadStrategy.class);
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1",
"broker-2")))
.when(registry).getAvailableBrokersAsync();
doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any());
+ doReturn(CompletableFuture.completedFuture(null)).when(unloadManager)
+ .waitAsync(any(), any(), anyLong(), any());
UnloadDecision decision = new UnloadDecision();
Unload unload = new Unload("broker-1", "bundle-1");
decision.getUnloads().put("broker-1", unload);
doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(),
any(), any());
- UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
context, channel, unloadStrategy);
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
unloadManager, context, channel, unloadStrategy);
scheduler.execute();
@@ -101,6 +106,7 @@ public class UnloadSchedulerTest {
LoadManagerContext context = setupContext();
BrokerRegistry registry = context.brokerRegistry();
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+ UnloadManager unloadManager = mock(UnloadManager.class);
NamespaceUnloadStrategy unloadStrategy =
mock(NamespaceUnloadStrategy.class);
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
@@ -112,7 +118,7 @@ public class UnloadSchedulerTest {
}
return Lists.newArrayList("broker-1", "broker-2");
},
Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
- UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
context, channel, unloadStrategy);
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
unloadManager, context, channel, unloadStrategy);
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
@@ -133,7 +139,8 @@ public class UnloadSchedulerTest {
context.brokerConfiguration().setLoadBalancerEnabled(false);
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
NamespaceUnloadStrategy unloadStrategy =
mock(NamespaceUnloadStrategy.class);
- UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
context, channel, unloadStrategy);
+ UnloadManager unloadManager = mock(UnloadManager.class);
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
unloadManager, context, channel, unloadStrategy);
scheduler.execute();
@@ -152,7 +159,8 @@ public class UnloadSchedulerTest {
context.brokerConfiguration().setLoadBalancerEnabled(false);
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
NamespaceUnloadStrategy unloadStrategy =
mock(NamespaceUnloadStrategy.class);
- UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
context, channel, unloadStrategy);
+ UnloadManager unloadManager = mock(UnloadManager.class);
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor,
unloadManager, context, channel, unloadStrategy);
doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync();
scheduler.execute();