This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 0ee3e002009 [fix][broker] timeout when broker registry hangs and 
monitor broker registry (ExtensibleLoadManagerImpl only) (#23382) (#23510)
0ee3e002009 is described below

commit 0ee3e002009b0a75559898eafcf89427b2e31c9a
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Oct 24 21:28:23 2024 -0700

    [fix][broker] timeout when broker registry hangs and monitor broker 
registry (ExtensibleLoadManagerImpl only) (#23382) (#23510)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |  11 ++-
 .../loadbalance/extensions/BrokerRegistry.java     |   5 +
 .../loadbalance/extensions/BrokerRegistryImpl.java |  69 +++++++++++---
 .../extensions/ExtensibleLoadManagerImpl.java      |  19 +++-
 .../channel/ServiceUnitStateChannelImpl.java       | 105 +++++++++++++++++----
 .../loadbalance/extensions/BrokerRegistryTest.java |  33 +++++++
 .../extensions/ExtensibleLoadManagerImplTest.java  |  24 ++++-
 .../channel/ServiceUnitStateChannelTest.java       |  75 ++++++++++-----
 .../org/apache/pulsar/client/admin/Brokers.java    |  12 ++-
 .../pulsar/client/admin/internal/BrokersImpl.java  |  18 +++-
 10 files changed, 310 insertions(+), 61 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 10b15f6175f..b20312226e7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -48,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService.State;
@@ -368,20 +369,26 @@ public class BrokersBase extends AdminResource {
     @ApiOperation(value = "Run a healthCheck against the broker")
     @ApiResponses(value = {
         @ApiResponse(code = 200, message = "Everything is OK"),
+        @ApiResponse(code = 307, message = "Current broker is not the target 
broker"),
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Cluster doesn't exist"),
         @ApiResponse(code = 500, message = "Internal server error")})
     public void healthCheck(@Suspended AsyncResponse asyncResponse,
                             @ApiParam(value = "Topic Version")
-                            @QueryParam("topicVersion") TopicVersion 
topicVersion) {
+                            @QueryParam("topicVersion") TopicVersion 
topicVersion,
+                            @QueryParam("brokerId") String brokerId) {
         validateSuperUserAccessAsync()
                 .thenAccept(__ -> checkDeadlockedThreads())
+                .thenCompose(__ -> maybeRedirectToBroker(
+                        StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() 
: brokerId))
                 .thenCompose(__ -> internalRunHealthCheck(topicVersion))
                 .thenAccept(__ -> {
                     LOG.info("[{}] Successfully run health check.", 
clientAppId());
                     asyncResponse.resume(Response.ok("ok").build());
                 }).exceptionally(ex -> {
-                    LOG.error("[{}] Fail to run health check.", clientAppId(), 
ex);
+                    if (!isRedirectException(ex)) {
+                        LOG.error("[{}] Fail to run health check.", 
clientAppId(), ex);
+                    }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index 79dba9c6334..d154edfbb32 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable {
      */
     boolean isStarted();
 
+    /**
+     * Return the broker has been registered.
+     */
+    boolean isRegistered();
+
     /**
      * Register local broker to metadata store.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index c3cb8413b01..ef46aa8938f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -52,6 +52,8 @@ import org.apache.pulsar.metadata.api.extended.CreateOption;
 @Slf4j
 public class BrokerRegistryImpl implements BrokerRegistry {
 
+    private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;
+
     private final PulsarService pulsar;
 
     private final ServiceConfiguration conf;
@@ -77,10 +79,11 @@ public class BrokerRegistryImpl implements BrokerRegistry {
     @VisibleForTesting
     final AtomicReference<State> state = new AtomicReference<>(State.Init);
 
-    public BrokerRegistryImpl(PulsarService pulsar) {
+    @VisibleForTesting
+    BrokerRegistryImpl(PulsarService pulsar, MetadataCache<BrokerLookupData> 
brokerLookupDataMetadataCache) {
         this.pulsar = pulsar;
         this.conf = pulsar.getConfiguration();
-        this.brokerLookupDataMetadataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
+        this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache;
         this.scheduler = pulsar.getLoadManagerExecutor();
         this.listeners = new ArrayList<>();
         this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
@@ -98,6 +101,10 @@ public class BrokerRegistryImpl implements BrokerRegistry {
                 pulsar.getBrokerVersion());
     }
 
+    public BrokerRegistryImpl(PulsarService pulsar) {
+        this(pulsar, 
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class));
+    }
+
     @Override
     public synchronized void start() throws PulsarServerException {
         if (!this.state.compareAndSet(State.Init, State.Started)) {
@@ -117,6 +124,12 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         return state == State.Started || state == State.Registered;
     }
 
+    @Override
+    public boolean isRegistered() {
+        final var state = this.state.get();
+        return state == State.Registered;
+    }
+
     @Override
     public CompletableFuture<Void> registerAsync() {
         final var state = this.state.get();
@@ -126,12 +139,35 @@ public class BrokerRegistryImpl implements BrokerRegistry 
{
         }
         log.info("[{}] Started registering self to {} (state: {})", 
getBrokerId(), brokerIdKeyPath, state);
         return brokerLookupDataMetadataCache.put(brokerIdKeyPath, 
brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
-                .thenAccept(__ -> {
-                    this.state.set(State.Registered);
-                    log.info("[{}] Finished registering self", getBrokerId());
+                
.orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS)
+                .whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        this.state.set(State.Registered);
+                        log.info("[{}] Finished registering self", 
getBrokerId());
+                    } else {
+                        log.error("[{}] Failed registering self", 
getBrokerId(), ex);
+                    }
                 });
     }
 
+    private void doRegisterAsyncWithRetries(int retry, CompletableFuture<Void> 
future) {
+        pulsar.getExecutor().schedule(() -> {
+            registerAsync().whenComplete((__, e) -> {
+                if (e != null) {
+                    doRegisterAsyncWithRetries(retry + 1, future);
+                } else {
+                    future.complete(null);
+                }
+            });
+        }, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), 
TimeUnit.MILLISECONDS);
+    }
+
+    private CompletableFuture<Void> registerAsyncWithRetries() {
+        var retryFuture = new CompletableFuture<Void>();
+        doRegisterAsyncWithRetries(0, retryFuture);
+        return retryFuture;
+    }
+
     @Override
     public synchronized void unregister() throws MetadataStoreException {
         if (state.compareAndSet(State.Registered, State.Unregistering)) {
@@ -218,17 +254,26 @@ public class BrokerRegistryImpl implements BrokerRegistry 
{
             // The registered node is an ephemeral node that could be deleted 
when the metadata store client's session
             // is expired. In this case, we should register again.
             final var brokerId = 
t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
+
+            CompletableFuture<Void> register;
             if (t.getType() == NotificationType.Deleted && 
getBrokerId().equals(brokerId)) {
-                registerAsync();
-            }
-            if (listeners.isEmpty()) {
-                return;
+                this.state.set(State.Started);
+                register = registerAsyncWithRetries();
+            } else {
+                register = CompletableFuture.completedFuture(null);
             }
-            this.scheduler.submit(() -> {
-                for (BiConsumer<String, NotificationType> listener : 
listeners) {
-                    listener.accept(brokerId, t.getType());
+            // Make sure to run the listeners after re-registered.
+            register.thenAccept(__ -> {
+                if (listeners.isEmpty()) {
+                    return;
                 }
+                this.scheduler.submit(() -> {
+                    for (BiConsumer<String, NotificationType> listener : 
listeners) {
+                        listener.accept(brokerId, t.getType());
+                    }
+                });
             });
+
         } catch (RejectedExecutionException e) {
             // Executor is shutting down
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index bf98df2c5ac..a1f01ea00be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -36,8 +36,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -1007,8 +1009,12 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 return;
             }
 
+            // Monitor broker registry
+            // Periodically check the broker registry in case metadata store 
fails.
+            validateBrokerRegistry();
+
             // Monitor role
-            // Periodically check the role in case ZK watcher fails.
+            // Periodically check the role in case metadata store fails.
             var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
             if (isChannelOwner) {
                 // System topic config might fail due to the race condition
@@ -1093,4 +1099,15 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     private boolean disabled() {
         return state.get() == State.DISABLED;
     }
+
+    private void validateBrokerRegistry()
+            throws ExecutionException, InterruptedException, TimeoutException {
+        var timeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
+        var lookup = 
brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, 
TimeUnit.SECONDS);
+        if (lookup.isEmpty()) {
+            log.warn("Found this broker:{} has not registered yet. Trying to 
register it",
+                    brokerRegistry.getBrokerId());
+            brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1f2715a00ac..58f2e6f30f4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -87,6 +87,7 @@ import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -98,6 +99,7 @@ import 
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -121,6 +123,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS 
= 10;
     private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
     private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 
* 60 * 1000;
+    private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
+    private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
     private final PulsarService pulsar;
     private final ServiceConfiguration config;
     private final Schema<ServiceUnitStateData> schema;
@@ -128,6 +132,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final String brokerId;
     private final Map<String, CompletableFuture<Void>> cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
+
     private BrokerRegistry brokerRegistry;
     private LeaderElectionService leaderElectionService;
     private TableView<ServiceUnitStateData> tableview;
@@ -267,7 +272,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     @Override
     public void cleanOwnerships() {
         disable();
-        doCleanup(brokerId);
+        doCleanup(brokerId, true);
     }
 
     @Override
@@ -383,6 +388,12 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 .get().getLeaderElectionService();
     }
 
+    @VisibleForTesting
+    protected PulsarAdmin getPulsarAdmin() throws PulsarServerException {
+        return pulsar.getAdminClient();
+    }
+
+    @Override
     public synchronized void close() throws PulsarServerException {
         channelState = Closed;
         boolean debug = debug();
@@ -491,6 +502,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             String serviceUnit,
             ServiceUnitState state,
             Optional<String> owner) {
+
+        // If this broker's registry does not exist(possibly suffering from 
connecting to the metadata store),
+        // we return the owner without its activeness check.
+        // This broker tries to serve lookups on a best efforts basis when 
metadata store connection is unstable.
+        if (!brokerRegistry.isRegistered()) {
+            return CompletableFuture.completedFuture(owner);
+        }
+
         return dedupeGetOwnerRequest(serviceUnit)
                 .thenCompose(newOwner -> {
                     if (newOwner == null) {
@@ -1259,19 +1278,25 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private void handleBrokerCreationEvent(String broker) {
-        CompletableFuture<Void> future = cleanupJobs.remove(broker);
-        if (future != null) {
-            future.cancel(false);
-            totalInactiveBrokerCleanupCancelledCnt++;
-            log.info("Successfully cancelled the ownership cleanup for 
broker:{}."
-                            + " Active cleanup job count:{}",
-                    broker, cleanupJobs.size());
-        } else {
-            if (debug()) {
-                log.info("No needs to cancel the ownership cleanup for 
broker:{}."
-                                + " There was no scheduled cleanup job. Active 
cleanup job count:{}",
-                        broker, cleanupJobs.size());
-            }
+
+        if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) {
+            healthCheckBrokerAsync(broker)
+                    .thenAccept(__ -> {
+                        CompletableFuture<Void> future = 
cleanupJobs.remove(broker);
+                        if (future != null) {
+                            future.cancel(false);
+                            totalInactiveBrokerCleanupCancelledCnt++;
+                            log.info("Successfully cancelled the ownership 
cleanup for broker:{}."
+                                            + " Active cleanup job count:{}",
+                                    broker, cleanupJobs.size());
+                        } else {
+                            if (debug()) {
+                                log.info("No needs to cancel the ownership 
cleanup for broker:{}."
+                                                + " There was no scheduled 
cleanup job. Active cleanup job count:{}",
+                                        broker, cleanupJobs.size());
+                            }
+                        }
+                    });
         }
     }
 
@@ -1318,7 +1343,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 var future = CompletableFuture
                         .runAsync(() -> {
                                     try {
-                                        doCleanup(broker);
+                                        doCleanup(broker, false);
                                     } catch (Throwable e) {
                                         log.error("Failed to run the cleanup 
job for the broker {}, "
                                                         + 
"totalCleanupErrorCnt:{}.",
@@ -1422,7 +1447,38 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 System.currentTimeMillis() - started);
     }
 
-    private synchronized void doCleanup(String broker)  {
+    private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future);
+        return future;
+    }
+
+    private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int 
retry, CompletableFuture<Void> future) {
+        try {
+            var admin = getPulsarAdmin();
+            admin.brokers().healthcheckAsync(TopicVersion.V2, 
Optional.of(brokerId))
+                    .whenComplete((__, e) -> {
+                        if (e == null) {
+                            log.info("Completed health-check broker :{}", 
brokerId, e);
+                            future.complete(null);
+                            return;
+                        }
+                        if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
+                            log.error("Failed health-check broker :{}", 
brokerId, e);
+                            
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+                        } else {
+                            pulsar.getExecutor()
+                                    .schedule(() -> 
doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
+                                            
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
+                                            MILLISECONDS);
+                        }
+                    });
+        } catch (PulsarServerException e) {
+            future.completeExceptionally(e);
+        }
+    }
+
+    private synchronized void doCleanup(String broker, boolean gracefully) {
         try {
             if 
(getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, 
TimeUnit.SECONDS)
                     .isEmpty()) {
@@ -1435,6 +1491,23 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             return;
         }
 
+        // if not gracefully, verify the broker is inactive by health-check.
+        if (!gracefully) {
+            try {
+                healthCheckBrokerAsync(broker).get(
+                        
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+                log.warn("Found that the broker to clean is healthy. Skip the 
broker:{}'s orphan bundle cleanup",
+                        broker);
+                return;
+            } catch (Exception e) {
+                if (debug()) {
+                    log.info("Failed to check broker:{} health", broker, e);
+                }
+                log.info("Checked the broker:{} health. Continue the orphan 
bundle cleanup", broker);
+            }
+        }
+
+
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 28a2a18500f..941d0e4cbc3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.broker.loadbalance.extensions;
 
 import static 
org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -36,6 +39,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -48,6 +52,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
@@ -396,6 +401,34 @@ public class BrokerRegistryTest {
         assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
     }
 
+    @Test
+    public void testRegisterAsyncTimeout() throws Exception {
+        var pulsar1 = createPulsarService();
+        pulsar1.start();
+        pulsar1.getConfiguration().setMetadataStoreOperationTimeoutSeconds(1);
+        var metadataCache = mock(MetadataCache.class);
+        var brokerRegistry = new BrokerRegistryImpl(pulsar1, metadataCache);
+
+        // happy case
+        
doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(),
 any(), any());
+        brokerRegistry.start();
+
+        // unhappy case (timeout)
+        doAnswer(invocationOnMock -> {
+            return CompletableFuture.supplyAsync(() -> null, 
CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS));
+        }).when(metadataCache).put(any(), any(), any());
+        try {
+            brokerRegistry.registerAsync().join();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+
+        // happy case again
+        
doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(),
 any(), any());
+        brokerRegistry.registerAsync().join();
+    }
+
+
     private static BrokerRegistryImpl.State getState(BrokerRegistryImpl 
brokerRegistry) {
         return brokerRegistry.state.get();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 7580c165a50..a136e22c178 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -132,6 +132,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -457,14 +458,17 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             assertEquals(unloadCount.get(), 0);
         });
 
-        ServiceUnitStateChannelImpl channel = new 
ServiceUnitStateChannelImpl(pulsar1);
-        channel.start();
+        @Cleanup
+        ServiceUnitStateChannelImpl channel3 = new 
ServiceUnitStateChannelImpl(pulsar1);
+        channel3.start();
+        @Cleanup
+        ServiceUnitStateChannelImpl channel4 = new 
ServiceUnitStateChannelImpl(pulsar2);
+        channel4.start();
         Awaitility.await().untilAsserted(() -> {
             assertEquals(onloadCount.get(), 2);
             assertEquals(unloadCount.get(), 0);
         });
 
-        channel.close();
     }
 
     @DataProvider(name = "isPersistentTopicSubscriptionTypeTest")
@@ -1895,6 +1899,20 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                         TimeUnit.SECONDS).get(2, TimeUnit.SECONDS);
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testMonitorBrokerRegistry() throws MetadataStoreException {
+        primaryLoadManager.getBrokerRegistry().unregister();
+        assertFalse(primaryLoadManager.getBrokerRegistry().isRegistered());
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(30, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> { // wait until true
+                    primaryLoadManager.monitor();
+                    
assertTrue(primaryLoadManager.getBrokerRegistry().isRegistered());
+                });
+    }
+
     private static abstract class MockBrokerFilter implements BrokerFilter {
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index af42649436b..a110746b229 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -90,6 +90,8 @@ import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.client.admin.Brokers;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -129,8 +131,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     private BrokerRegistryImpl registry;
 
+    private PulsarAdmin pulsarAdmin;
+
     private ExtensibleLoadManagerImpl loadManager;
 
+    private Brokers brokers;
+
     @BeforeClass
     @Override
     protected void setup() throws Exception {
@@ -146,7 +152,9 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace("public/default");
 
         pulsar1 = pulsar;
-        registry = new BrokerRegistryImpl(pulsar);
+        registry = spy(new BrokerRegistryImpl(pulsar1));
+        registry.start();
+        pulsarAdmin = spy(pulsar.getAdminClient());
         loadManagerContext = mock(LoadManagerContext.class);
         
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore();
         
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore();
@@ -177,6 +185,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         childBundle31 = "public/default3/" + childBundle1Range;
         childBundle32 = "public/default3/" + childBundle2Range;
+
+        brokers = mock(Brokers.class);
+        doReturn(CompletableFuture.failedFuture(new 
RuntimeException("failed"))).when(brokers)
+                .healthcheckAsync(any(), any());
     }
 
     @BeforeMethod
@@ -689,17 +701,18 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     @Test(priority = 8)
     public void handleBrokerCreationEventTest() throws IllegalAccessException {
         var cleanupJobs = getCleanupJobs(channel1);
-        String broker = "broker-1";
+        String broker = brokerId2;
         var future = new CompletableFuture();
         cleanupJobs.put(broker, future);
-        channel1.handleBrokerRegistrationEvent(broker, 
NotificationType.Created);
-        assertEquals(0, cleanupJobs.size());
-        assertTrue(future.isCancelled());
+        ((ServiceUnitStateChannelImpl) 
channel1).handleBrokerRegistrationEvent(broker, NotificationType.Created);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, cleanupJobs.size());
+            assertTrue(future.isCancelled());
+        });
     }
 
     @Test(priority = 9)
-    public void handleBrokerDeletionEventTest()
-            throws IllegalAccessException, ExecutionException, 
InterruptedException, TimeoutException {
+    public void handleBrokerDeletionEventTest() throws Exception {
 
         var cleanupJobs1 = getCleanupJobs(channel1);
         var cleanupJobs2 = getCleanupJobs(channel2);
@@ -752,6 +765,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
         FieldUtils.writeDeclaredField(followerChannel, 
"lastMetadataSessionEventTimestamp",
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
+
+        doReturn(brokers).when(pulsarAdmin).brokers();
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
         followerChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
         leaderChannel.handleBrokerRegistrationEvent(brokerId2, 
NotificationType.Deleted);
@@ -809,6 +824,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 3,
                 0,
                 0);
+        reset(pulsarAdmin);
 
         // broker is back online
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Created);
@@ -833,6 +849,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
 
         // broker is offline again
+        doReturn(brokers).when(pulsarAdmin).brokers();
         FieldUtils.writeDeclaredField(leaderChannel, 
"maxCleanupDelayTimeInSecs", 3, true);
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
         followerChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
@@ -874,6 +891,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 4,
                 0,
                 1);
+        reset(pulsarAdmin);
 
         // test unstable state
         channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, 
Optional.of(broker)));
@@ -1540,9 +1558,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
         FieldUtils.writeDeclaredField(followerChannel, 
"lastMetadataSessionEventTimestamp",
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
+
+        doReturn(brokers).when(pulsarAdmin).brokers();
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
         followerChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
 
+
         waitUntilNewOwner(channel2, releasingBundle, brokerId2);
         waitUntilNewOwner(channel2, childBundle11, brokerId2);
         waitUntilNewOwner(channel2, childBundle12, brokerId2);
@@ -1558,7 +1579,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         // clean-up
         FieldUtils.writeDeclaredField(leaderChannel, 
"maxCleanupDelayTimeInSecs", 3 * 60, true);
         cleanTableViews();
-
+        reset(pulsarAdmin);
     }
 
     @Test(priority = 18)
@@ -1675,7 +1696,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         // case 3: the bundle ownership is transferring, and the dst broker is 
the channel owner
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1));
-        assertTrue(!channel1.getOwnerAsync(bundle).isDone());
+        assertFalse(channel1.getOwnerAsync(bundle).isDone());
 
         // case 4: the bundle ownership is found
         overrideTableViews(bundle,
@@ -1684,18 +1705,15 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(owner, broker);
 
         // case 5: the owner lookup gets delayed
-        var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
-        FieldUtils.writeDeclaredField(channel1,
-                "brokerRegistry", spyRegistry , true);
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 1000, true);
         var delayedFuture = new CompletableFuture();
-        doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker));
+        doReturn(delayedFuture).when(registry).lookupAsync(eq(broker));
         CompletableFuture.runAsync(() -> {
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();;
+                Thread.currentThread().interrupt();
             }
             delayedFuture.complete(Optional.of(broker));
         });
@@ -1708,7 +1726,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // case 6: the owner is inactive
         doReturn(CompletableFuture.completedFuture(Optional.empty()))
-                .when(spyRegistry).lookupAsync(eq(broker));
+                .when(registry).lookupAsync(eq(broker));
 
         // verify getOwnerAsync times out
         start = System.currentTimeMillis();
@@ -1716,19 +1734,32 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertTrue(ex.getCause() instanceof IllegalStateException);
         assertTrue(System.currentTimeMillis() - start >= 1000);
 
+        try {
+            // verify getOwnerAsync returns immediately when not registered
+            registry.unregister();
+            start = System.currentTimeMillis();
+            assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
+            elapsed = System.currentTimeMillis() - start;
+            assertTrue(elapsed < 1000);
+        } finally {
+            registry.registerAsync().join();
+        }
+
+
         // case 7: the ownership cleanup(no new owner) by the leader channel
         doReturn(CompletableFuture.completedFuture(Optional.empty()))
                 .when(loadManager).selectAsync(any(), any(), any());
-        var leaderChannel = channel1;
+        ServiceUnitStateChannelImpl leaderChannel = 
(ServiceUnitStateChannelImpl) channel1;
         String leader1 = channel1.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
         String leader2 = channel2.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
         assertEquals(leader1, leader2);
         if (leader1.equals(brokerId2)) {
-            leaderChannel = channel2;
+            leaderChannel = (ServiceUnitStateChannelImpl) channel2;
         }
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
         FieldUtils.writeDeclaredField(leaderChannel, 
"lastMetadataSessionEventTimestamp",
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
+        doReturn(brokers).when(pulsarAdmin).brokers();
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
 
         // verify the ownership cleanup, and channel's getOwnerAsync returns 
empty result without timeout
@@ -1740,7 +1771,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         waitUntilState(channel2, bundle, Init);
 
         assertTrue(System.currentTimeMillis() - start < 20_000);
-
+        reset(pulsarAdmin);
         // case 8: simulate ownership cleanup(brokerId1 as the new owner) by 
the leader channel
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
@@ -1750,6 +1781,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(leaderChannel, 
"lastMetadataSessionEventTimestamp",
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
         getCleanupJobs(leaderChannel).clear();
+        doReturn(brokers).when(pulsarAdmin).brokers();
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
 
         // verify the ownership cleanup, and channel's getOwnerAsync returns 
brokerId1 without timeout
@@ -1760,10 +1792,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         // test clean-up
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
-        FieldUtils.writeDeclaredField(channel1,
-                "brokerRegistry", registry , true);
         cleanTableViews();
-
+        reset(pulsarAdmin);
     }
 
     @Test(priority = 20)
@@ -2104,7 +2134,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     }
 
     ServiceUnitStateChannelImpl createChannel(PulsarService pulsar)
-            throws IllegalAccessException {
+            throws IllegalAccessException, PulsarServerException {
         var tmpChannel = new ServiceUnitStateChannelImpl(pulsar);
         FieldUtils.writeDeclaredField(tmpChannel, 
"ownershipMonitorDelayTimeInSecs", 5, true);
         var channel = spy(tmpChannel);
@@ -2112,6 +2142,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         doReturn(loadManagerContext).when(channel).getContext();
         doReturn(registry).when(channel).getBrokerRegistry();
         doReturn(loadManager).when(channel).getLoadManager();
+        doReturn(pulsarAdmin).when(channel).getPulsarAdmin();
 
 
         var leaderElectionService = new LeaderElectionService(
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index dc0b7c9885a..eed73f38282 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -320,10 +321,19 @@ public interface Brokers {
      */
     void healthcheck(TopicVersion topicVersion) throws PulsarAdminException;
 
+    /**
+     * Run a healthcheck on the target broker or on the broker.
+     * @param brokerId target broker id to check the health. If empty, it 
checks the health on the connected broker.
+     *
+     * @throws PulsarAdminException if the healthcheck fails.
+     */
+    void healthcheck(TopicVersion topicVersion, Optional<String> brokerId) 
throws PulsarAdminException;
+
     /**
      * Run a healthcheck on the broker asynchronously.
      */
-    CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
+    CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion, 
Optional<String> brokerId);
+
 
     /**
      * Trigger the current broker to graceful-shutdown asynchronously.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index b82c3fd0f41..35b261b196e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin.internal;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
@@ -168,26 +169,35 @@ public class BrokersImpl extends BaseResource implements 
Brokers {
     @Override
     @Deprecated
     public void healthcheck() throws PulsarAdminException {
-        healthcheck(TopicVersion.V1);
+        healthcheck(TopicVersion.V1, Optional.empty());
     }
 
     @Override
     @Deprecated
     public CompletableFuture<Void> healthcheckAsync() {
-        return healthcheckAsync(TopicVersion.V1);
+        return healthcheckAsync(TopicVersion.V1, Optional.empty());
     }
 
+
     @Override
     public void healthcheck(TopicVersion topicVersion) throws 
PulsarAdminException {
-        sync(() -> healthcheckAsync(topicVersion));
+        sync(() -> healthcheckAsync(topicVersion, Optional.empty()));
     }
 
     @Override
-    public CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion) 
{
+    public void healthcheck(TopicVersion topicVersion, Optional<String> 
brokerId) throws PulsarAdminException {
+        sync(() -> healthcheckAsync(topicVersion, brokerId));
+    }
+
+    @Override
+    public CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion, 
Optional<String> brokerId) {
         WebTarget path = adminBrokers.path("health");
         if (topicVersion != null) {
             path = path.queryParam("topicVersion", topicVersion);
         }
+        if (brokerId.isPresent()) {
+            path = path.queryParam("brokerId", brokerId.get());
+        }
         final CompletableFuture<Void> future = new CompletableFuture<>();
         asyncGetRequest(path,
                 new InvocationCallback<String>() {


Reply via email to