This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ff70e661ffe5c867e4f5830afcdbf06e6280e114 Author: sinan liu <[email protected]> AuthorDate: Sun Sep 28 23:12:08 2025 +0800 [fix][broker] Flaky-test: ExtensibleLoadManagerImplTest.testDisableBroker (#24770) (cherry picked from commit e44e084faf66738a22d61892a86d9fc9943bc484) --- .../channel/ServiceUnitStateChannelImpl.java | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 7e864ddac2c..782e4a38bad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -491,6 +491,32 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { String serviceUnit, ServiceUnitState state, Optional<String> owner) { + // When the channel is closed, do not perform liveness verification, return according to the status: + if (channelState == Closed) { + switch (state) { + // Owned/Splitting: Directly return owner (for isOwner judgment as true) + case Owned: + case Splitting: + return CompletableFuture.completedFuture(owner); + case Assigning: + case Releasing: + if (owner.isPresent()) { + if (isTargetBroker(owner.get())) { + // This machine is the target taker, + // return an unfinished future with "waiting for ownership" + return dedupeGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable); + } else { + // The target is another broker, return directly so that the upper layer can redirect + return CompletableFuture.completedFuture(owner); + } + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + // Other status: return empty + default: + return CompletableFuture.completedFuture(Optional.empty()); + } + } return dedupeGetOwnerRequest(serviceUnit) .thenCompose(newOwner -> { if (newOwner == null) {
