This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d9e5c03a995ddaff01917a3877e84a5ea30927a9
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Thu Apr 24 19:11:51 2025 +0300

    [fix][broker] Fix broker shutdown delay by resolving hanging health checks 
(#24210)
    
    (cherry picked from commit 12961caf4967b03634bb443d4718eb30a2a1f0ae)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 271 ++++++++++-------
 .../pulsar/broker/admin/impl/BrokersBase.java      | 149 +---------
 .../pulsar/broker/service/HealthChecker.java       | 330 +++++++++++++++++++++
 .../broker/admin/AdminApiHealthCheckTest.java      |  23 +-
 .../systopic/PartitionedSystemTopicTest.java       |  10 +-
 .../testcontext/NonStartableTestPulsarService.java |   6 +-
 6 files changed, 531 insertions(+), 258 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index cb63d1db71d..308c3459065 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.HealthChecker;
 import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
@@ -145,6 +146,7 @@ import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
@@ -300,6 +302,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private volatile CompletableFuture<Void> closeFuture;
     // key is listener name, value is pulsar address and pulsar ssl address
     private Map<String, AdvertisedListener> advertisedListeners;
+    private volatile HealthChecker healthChecker;
 
     public PulsarService(ServiceConfiguration config) {
         this(config, Optional.empty(), (exitCode) -> LOG.info("Process 
termination requested with code {}. "
@@ -476,6 +479,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             // It only tells the Pulsar clients that this service is not ready 
to serve for the lookup requests
             state = State.Closing;
 
+            if (healthChecker != null) {
+                healthChecker.close();
+                healthChecker = null;
+            }
+
             // close the service in reverse order v.s. in which they are 
started
             if (this.resourceUsageTransportManager != null) {
                 try {
@@ -1609,76 +1617,36 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.offloaderReadExecutor;
     }
 
-    public PulsarClientImpl createClientImpl(ClientConfigurationData 
clientConf)
+    public PulsarClientImpl createClientImpl(ClientConfigurationData conf) 
throws PulsarClientException {
+        return createClientImpl(conf, null);
+    }
+
+    public PulsarClientImpl 
createClientImpl(Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
+            throws PulsarClientException {
+        return createClientImpl(null, customizer);
+    }
+
+    public PulsarClientImpl createClientImpl(ClientConfigurationData conf,
+                                             
Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
             throws PulsarClientException {
-        return PulsarClientImpl.builder()
-                .conf(clientConf)
+        PulsarClientImpl.PulsarClientImplBuilder pulsarClientImplBuilder = 
PulsarClientImpl.builder()
+                .conf(conf != null ? conf : createClientConfigurationData())
                 .eventLoopGroup(ioEventLoopGroup)
                 .timer(brokerClientSharedTimer)
                 
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
                 
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
                 
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
-                
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
-                .build();
+                
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider);
+        if (customizer != null) {
+            customizer.accept(pulsarClientImplBuilder);
+        }
+        return pulsarClientImplBuilder.build();
     }
 
     public synchronized PulsarClient getClient() throws PulsarServerException {
         if (this.client == null) {
             try {
-                ClientConfigurationData initialConf = new 
ClientConfigurationData();
-
-                // Disable memory limit for broker client and disable stats
-                initialConf.setMemoryLimitBytes(0);
-                initialConf.setStatsIntervalSeconds(0);
-
-                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
-                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
-                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
-                Map<String, Object> overrides = PropertiesUtils
-                        
.filterAndMapProperties(this.getConfiguration().getProperties(), 
"brokerClient_");
-                ClientConfigurationData conf =
-                        ConfigurationDataUtils.loadData(overrides, 
initialConf, ClientConfigurationData.class);
-
-                // Disabled auto release useless connections
-                // The automatic release connection feature is not yet perfect 
for transaction scenarios, so turn it
-                // off first.
-                conf.setConnectionMaxIdleSeconds(-1);
-
-                boolean tlsEnabled = 
this.getConfiguration().isBrokerClientTlsEnabled();
-                conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : 
this.brokerServiceUrl);
-
-                if (tlsEnabled) {
-                    
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
-                    
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
-                    
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
-                    
conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
-                    if 
(this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
-                        conf.setUseKeyStoreTls(true);
-                        
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
-                        
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
-                        
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
-                        
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
-                        
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
-                        
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
-                    } else {
-                        conf.setTlsTrustCertsFilePath(
-                                
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
-                                        ? 
this.getConfiguration().getBrokerClientTrustCertsFilePath()
-                                        : 
this.getConfiguration().getTlsTrustCertsFilePath());
-                        
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
-                        
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
-                    }
-                }
-
-                if 
(isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
-                    
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
-                    
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
-                    conf.setAuthParamMap(null);
-                    conf.setAuthentication(AuthenticationFactory.create(
-                            
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
-                            
this.getConfiguration().getBrokerClientAuthenticationParameters()));
-                }
-                this.client = createClientImpl(conf);
+                this.client = createClientImpl(null, null);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
             }
@@ -1686,59 +1654,120 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.client;
     }
 
+    protected ClientConfigurationData createClientConfigurationData()
+            throws PulsarClientException.UnsupportedAuthenticationException {
+        ClientConfigurationData initialConf = new ClientConfigurationData();
+
+        // Disable memory limit for broker client and disable stats
+        initialConf.setMemoryLimitBytes(0);
+        initialConf.setStatsIntervalSeconds(0);
+
+        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+        Map<String, Object> overrides = PropertiesUtils
+                
.filterAndMapProperties(this.getConfiguration().getProperties(), 
"brokerClient_");
+        ClientConfigurationData conf =
+                ConfigurationDataUtils.loadData(overrides, initialConf, 
ClientConfigurationData.class);
+
+        // Disabled auto release useless connections
+        // The automatic release connection feature is not yet perfect for 
transaction scenarios, so turn it
+        // off first.
+        conf.setConnectionMaxIdleSeconds(-1);
+
+        boolean tlsEnabled = 
this.getConfiguration().isBrokerClientTlsEnabled();
+        conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : 
this.brokerServiceUrl);
+
+        if (tlsEnabled) {
+            
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
+            
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
+            
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
+            
conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
+            if 
(this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
+                conf.setUseKeyStoreTls(true);
+                
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
+                
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
+                
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
+                
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
+                
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
+                
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
+            } else {
+                conf.setTlsTrustCertsFilePath(
+                        
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
+                                ? 
this.getConfiguration().getBrokerClientTrustCertsFilePath()
+                                : 
this.getConfiguration().getTlsTrustCertsFilePath());
+                
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
+                
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
+            }
+        }
+
+        if 
(isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
+            
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
+            
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
+            conf.setAuthParamMap(null);
+            conf.setAuthentication(AuthenticationFactory.create(
+                    
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
+                    
this.getConfiguration().getBrokerClientAuthenticationParameters()));
+        }
+        return conf;
+    }
+
     public synchronized PulsarAdmin getAdminClient() throws 
PulsarServerException {
         if (this.adminClient == null) {
             try {
-                ServiceConfiguration conf = this.getConfiguration();
-                final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? 
webServiceAddressTls : webServiceAddress;
-                if (adminApiUrl == null) {
-                    throw new IllegalArgumentException("Web service address 
was not set properly "
-                            + ", isBrokerClientTlsEnabled: " + 
conf.isBrokerClientTlsEnabled()
-                            + ", webServiceAddressTls: " + webServiceAddressTls
-                            + ", webServiceAddress: " + webServiceAddress);
-                }
-                PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
-
-                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
-                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
-                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
-                
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), 
"brokerClient_"));
+                this.adminClient = getCreateAdminClientBuilder().build();
+                LOG.info("created admin with url {} ", 
adminClient.getServiceUrl());
+            } catch (Exception e) {
+                throw new PulsarServerException(e);
+            }
+        }
+        return this.adminClient;
+    }
 
-                builder.authentication(
-                        conf.getBrokerClientAuthenticationPlugin(),
-                        conf.getBrokerClientAuthenticationParameters());
-
-                if (conf.isBrokerClientTlsEnabled()) {
-                    builder.tlsCiphers(config.getBrokerClientTlsCiphers())
-                            
.tlsProtocols(config.getBrokerClientTlsProtocols());
-                    if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
-                        
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
-                                
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
-                                
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
-                                
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
-                                
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
-                                
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
-                    } else {
-                        
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
-                                
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
-                                
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
-                    }
-                    
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
-                            
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
-                }
+    protected PulsarAdminBuilder getCreateAdminClientBuilder()
+            throws PulsarClientException.UnsupportedAuthenticationException {
+        ServiceConfiguration conf = this.getConfiguration();
+        final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? 
webServiceAddressTls : webServiceAddress;
+        if (adminApiUrl == null) {
+            throw new IllegalArgumentException("Web service address was not 
set properly "
+                    + ", isBrokerClientTlsEnabled: " + 
conf.isBrokerClientTlsEnabled()
+                    + ", webServiceAddressTls: " + webServiceAddressTls
+                    + ", webServiceAddress: " + webServiceAddress);
+        }
+        PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
 
-                // most of the admin request requires to make zk-call so, keep 
the max read-timeout based on
-                // zk-operation timeout
-                
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), 
"brokerClient_"));
 
-                this.adminClient = builder.build();
-                LOG.info("created admin with url {} ", adminApiUrl);
-            } catch (Exception e) {
-                throw new PulsarServerException(e);
+        builder.authentication(
+                conf.getBrokerClientAuthenticationPlugin(),
+                conf.getBrokerClientAuthenticationParameters());
+
+        if (conf.isBrokerClientTlsEnabled()) {
+            builder.tlsCiphers(config.getBrokerClientTlsCiphers())
+                    .tlsProtocols(config.getBrokerClientTlsProtocols());
+            if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
+                
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
+                        .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
+                        
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
+                        .tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
+                        .tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
+                        
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
+            } else {
+                
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
+                        .tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
+                        
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
             }
+            
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
+                    
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
         }
 
-        return this.adminClient;
+        // most of the admin request requires to make zk-call so, keep the max 
read-timeout based on
+        // zk-operation timeout
+        builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        return builder;
     }
 
     /**
@@ -2069,4 +2098,40 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             mutex.unlock();
         }
     }
+
+    /**
+     * Run health check for the broker.
+     *
+     * @return CompletableFuture
+     */
+    public CompletableFuture<Void> runHealthCheck(TopicVersion topicVersion, 
String clientId) {
+        if (!isRunning()) {
+            return CompletableFuture.failedFuture(new 
PulsarServerException("Broker is not running"));
+        }
+        HealthChecker localHealthChecker = getHealthChecker();
+        if (localHealthChecker == null) {
+            return CompletableFuture.failedFuture(new 
PulsarServerException("Broker is not running"));
+        }
+        return localHealthChecker.checkHealth(topicVersion, clientId);
+    }
+
+    @VisibleForTesting
+    public HealthChecker getHealthChecker() {
+        if (healthChecker == null) {
+            synchronized (this) {
+                if (healthChecker == null) {
+                    if (!isRunning()) {
+                        return null;
+                    }
+                    try {
+                        healthChecker = new HealthChecker(this);
+                    } catch (PulsarServerException e) {
+                        LOG.error("Failed to create health checker", e);
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+        return healthChecker;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3b5bc66f5d1..ea25367ca3a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -26,16 +26,11 @@ import io.swagger.annotations.ApiResponses;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
-import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -50,22 +45,12 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.PulsarVersion;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService.State;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
-import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerInfo;
 import org.apache.pulsar.common.policies.data.BrokerOperation;
@@ -80,16 +65,9 @@ import org.slf4j.LoggerFactory;
  */
 public class BrokersBase extends AdminResource {
     private static final Logger LOG = 
LoggerFactory.getLogger(BrokersBase.class);
-    public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
     // log a full thread dump when a deadlock is detected in healthcheck once 
every 10 minutes
     // to prevent excessive logging
     private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 
600000L;
-    // there is a timeout of 60 seconds default in the client(readTimeoutMs), 
so we need to set the timeout
-    // a bit shorter than 60 seconds to avoid the client timeout exception 
thrown before the server timeout exception.
-    // or we can't propagate the server timeout exception to the client.
-    private static final Duration HEALTH_CHECK_READ_TIMEOUT = 
Duration.ofSeconds(58);
-    private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
-            FutureUtil.createTimeoutException("Timeout", BrokersBase.class, 
"healthCheckRecursiveReadNext(...)");
     private static volatile long threadDumpLoggedTimestamp;
 
     @GET
@@ -384,16 +362,21 @@ public class BrokersBase extends AdminResource {
         @ApiResponse(code = 307, message = "Current broker is not the target 
broker"),
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Cluster doesn't exist"),
-        @ApiResponse(code = 500, message = "Internal server error")})
+        @ApiResponse(code = 500, message = "Internal server error"),
+        @ApiResponse(code = 503, message = "Service unavailable")})
     public void healthCheck(@Suspended AsyncResponse asyncResponse,
                             @ApiParam(value = "Topic Version")
                             @QueryParam("topicVersion") TopicVersion 
topicVersion,
                             @QueryParam("brokerId") String brokerId) {
+        if (pulsar().getState() == State.Closed || pulsar().getState() == 
State.Closing) {
+            
asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE).build());
+            return;
+        }
         
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), 
StringUtils.isBlank(brokerId)
                 ? pulsar().getBrokerId() : brokerId, 
BrokerOperation.HEALTH_CHECK)
-                .thenAccept(__ -> checkDeadlockedThreads())
                 .thenCompose(__ -> maybeRedirectToBroker(
                         StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() 
: brokerId))
+                .thenAccept(__ -> checkDeadlockedThreads())
                 .thenCompose(__ -> internalRunHealthCheck(topicVersion))
                 .thenAccept(__ -> {
                     LOG.info("[{}] Successfully run health check.", 
clientAppId());
@@ -431,124 +414,8 @@ public class BrokersBase extends AdminResource {
         }
     }
 
-
     private CompletableFuture<Void> internalRunHealthCheck(TopicVersion 
topicVersion) {
-        String brokerId = pulsar().getBrokerId();
-        NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
-                ? NamespaceService.getHeartbeatNamespaceV2(brokerId, 
pulsar().getConfiguration())
-                : NamespaceService.getHeartbeatNamespace(brokerId, 
pulsar().getConfiguration());
-        final String topicName = String.format("persistent://%s/%s", 
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
-        LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), 
topicName);
-        final String messageStr = UUID.randomUUID().toString();
-        final String subscriptionName = "healthCheck-" + messageStr;
-        // create non-partitioned topic manually and close the previous reader 
if present.
-        return pulsar().getBrokerService().getTopic(topicName, true)
-            .thenCompose(topicOptional -> {
-                if (!topicOptional.isPresent()) {
-                    LOG.error("[{}] Fail to run health check while get topic 
{}. because get null value.",
-                            clientAppId(), topicName);
-                    throw new RestException(Status.NOT_FOUND,
-                            String.format("Topic [%s] not found after 
create.", topicName));
-                }
-                PulsarClient client;
-                try {
-                    client = pulsar().getClient();
-                } catch (PulsarServerException e) {
-                    LOG.error("[{}] Fail to run health check while get 
client.", clientAppId());
-                    throw new RestException(e);
-                }
-                CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
-                
client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                        .thenCompose(producer -> 
client.newReader(Schema.STRING).topic(topicName)
-                                .subscriptionName(subscriptionName)
-                                .startMessageId(MessageId.latest)
-                                .createAsync().exceptionally(createException 
-> {
-                                    producer.closeAsync().exceptionally(ex -> {
-                                        LOG.error("[{}] Close producer fail 
while heath check.", clientAppId());
-                                        return null;
-                                    });
-                                    throw 
FutureUtil.wrapToCompletionException(createException);
-                                }).thenCompose(reader -> 
producer.sendAsync(messageStr)
-                                        .thenCompose(__ -> 
FutureUtil.addTimeoutHandling(
-                                                
healthCheckRecursiveReadNext(reader, messageStr),
-                                                HEALTH_CHECK_READ_TIMEOUT, 
pulsar().getBrokerService().executor(),
-                                                () -> 
HEALTH_CHECK_TIMEOUT_EXCEPTION))
-                                        .whenComplete((__, ex) -> {
-                                            closeAndReCheck(producer, reader, 
topicOptional.get(), subscriptionName)
-                                                    .whenComplete((unused, 
innerEx) -> {
-                                                        if (ex != null) {
-                                                            
resultFuture.completeExceptionally(ex);
-                                                        } else {
-                                                            
resultFuture.complete(null);
-                                                        }
-                                                    });
-                                        }
-                                ))
-                        ).exceptionally(ex -> {
-                            resultFuture.completeExceptionally(ex);
-                            return null;
-                        });
-                return resultFuture;
-            });
-    }
-
-    /**
-     * Close producer and reader and then to re-check if this operation is 
success.
-     *
-     * Re-check
-     * - Producer: If close fails we will print error log to notify user.
-     * - Consumer: If close fails we will force delete subscription.
-     *
-     * @param producer Producer
-     * @param reader Reader
-     * @param topic  Topic
-     * @param subscriptionName  Subscription name
-     */
-    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, 
Reader<String> reader,
-                                                    Topic topic, String 
subscriptionName) {
-        // no matter exception or success, we still need to
-        // close producer/reader
-        CompletableFuture<Void> producerFuture = producer.closeAsync();
-        CompletableFuture<Void> readerFuture = reader.closeAsync();
-        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
-        futures.add(producerFuture);
-        futures.add(readerFuture);
-        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
-                .exceptionally(closeException -> {
-                    if (readerFuture.isCompletedExceptionally()) {
-                        LOG.error("[{}] Close reader fail while heath check.", 
clientAppId());
-                        Subscription subscription =
-                                topic.getSubscription(subscriptionName);
-                        // re-check subscription after reader close
-                        if (subscription != null) {
-                            LOG.warn("[{}] Force delete subscription {} "
-                                            + "when it still exists after the"
-                                            + " reader is closed.",
-                                    clientAppId(), subscription);
-                            subscription.deleteForcefully()
-                                    .exceptionally(ex -> {
-                                        LOG.error("[{}] Force delete 
subscription fail"
-                                                        + " while health 
check",
-                                                clientAppId(), ex);
-                                        return null;
-                                    });
-                        }
-                    } else {
-                        // producer future fail.
-                        LOG.error("[{}] Close producer fail while heath 
check.", clientAppId());
-                    }
-                    return null;
-                });
-    }
-
-    private CompletableFuture<Void> 
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
-        return reader.readNextAsync()
-                .thenCompose(msg -> {
-                    if (!Objects.equals(content, msg.getValue())) {
-                        return healthCheckRecursiveReadNext(reader, content);
-                    }
-                    return CompletableFuture.completedFuture(null);
-                });
+        return pulsar().runHealthCheck(topicVersion, clientAppId());
     }
 
     private CompletableFuture<Void> 
internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HealthChecker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HealthChecker.java
new file mode 100644
index 00000000000..e80e0b09475
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HealthChecker.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicVersion;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+
+/**
+ * The HealthChecker class provides functionality to monitor and verify the 
health of a Pulsar broker.
+ * It performs health checks by creating test topics, producing and consuming 
messages to verify broker functionality.
+ * This class implements AutoCloseable to ensure proper cleanup of resources 
when the broker is shut down.
+ */
+@Slf4j
+public class HealthChecker implements AutoCloseable{
+    /**
+     * Suffix used for health check topic names.
+     */
+    public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
+    /**
+     * Timeout duration for health check operations.
+     * Set to 58 seconds to be shorter than the client's default 60-second 
timeout,
+     * allowing server timeout exceptions to propagate properly to the client.
+     */
+    private static final Duration DEFAULT_HEALTH_CHECK_READ_TIMEOUT = 
Duration.ofSeconds(58);
+    /**
+     * Pre-created timeout exception for health check operations.
+     */
+    private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
+            FutureUtil.createTimeoutException("Timeout", HealthChecker.class, 
"healthCheckRecursiveReadNext(...)");
+    /**
+     * Reference to the main Pulsar service.
+     */
+    private final PulsarService pulsar;
+    /**
+     * Topic name for v1 heartbeat checks.
+     */
+    private final String heartbeatTopicV1;
+    /**
+     * Topic name for v2 heartbeat checks.
+     */
+    private final String heartbeatTopicV2;
+    /**
+     * Pulsar client instance for health check operations.
+     * A separate client is needed so that it can be shutdown before the 
webservice is closed.
+     * Pending requests for healthchecks to the /health endpoint can be 
cancelled this way.
+     */
+    private final PulsarClient client;
+    /**
+     * Executor for lookup operations.
+     * This is also needed so that pending healthchecks can be properly 
cancelled at shutdown.
+     */
+    private final ScheduledExecutorProvider lookupExecutor;
+    /**
+     * Executor for scheduled tasks.
+     * This is also needed so that pending healthchecks can be properly 
cancelled at shutdown.
+     */
+    private final ScheduledExecutorProvider scheduledExecutorProvider;
+    /**
+     * Set of pending health check operations.
+     */
+    private final Set<CompletableFuture<Void>> pendingFutures = new 
HashSet<>();
+
+    private final Duration timeout = DEFAULT_HEALTH_CHECK_READ_TIMEOUT;
+
+    public HealthChecker(PulsarService pulsar) throws PulsarServerException {
+        this.pulsar = pulsar;
+        this.heartbeatTopicV1 = getHeartbeatTopicName(pulsar.getBrokerId(), 
pulsar.getConfiguration(), false);
+        this.heartbeatTopicV2 = getHeartbeatTopicName(pulsar.getBrokerId(), 
pulsar.getConfiguration(), true);
+        this.lookupExecutor =
+                new ScheduledExecutorProvider(1, 
"health-checker-client-lookup-executor");
+        this.scheduledExecutorProvider =
+                new ScheduledExecutorProvider(1, 
"health-checker-client-scheduled-executor");
+        try {
+            this.client = pulsar.createClientImpl(builder -> {
+                builder.lookupExecutorProvider(lookupExecutor);
+                builder.scheduledExecutorProvider(scheduledExecutorProvider);
+            });
+        } catch (PulsarClientException e) {
+            throw new PulsarServerException("Error creating client for 
HealthChecker", e);
+        }
+    }
+
+    private static String getHeartbeatTopicName(String brokerId, 
ServiceConfiguration configuration, boolean isV2) {
+        NamespaceName namespaceName = isV2
+                ? NamespaceService.getHeartbeatNamespaceV2(brokerId, 
configuration)
+                : NamespaceService.getHeartbeatNamespace(brokerId, 
configuration);
+        return String.format("persistent://%s/%s", namespaceName, 
HEALTH_CHECK_TOPIC_SUFFIX);
+    }
+
+    /**
+     * Performs a health check on the broker by verifying message production 
and consumption.
+     * The health check process includes:
+     * 1. Producing a test message
+     * 2. Reading the message back to verify end-to-end functionality
+     *
+     * @param topicVersion The version of the topic to use (V1 or V2)
+     * @param clientAppId  The identifier of the client application requesting 
the health check
+     * @return A CompletableFuture that completes when the health check is 
successful, or completes exceptionally if the
+     * check fails
+     */
+    public CompletableFuture<Void> checkHealth(TopicVersion topicVersion, 
String clientAppId) {
+        final String topicName = topicVersion == TopicVersion.V2 ? 
heartbeatTopicV2 : heartbeatTopicV1;
+        log.info("[{}] Running healthCheck with topic={}", clientAppId, 
topicName);
+        final String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
+        // create non-partitioned topic manually and close the previous reader 
if present.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        addToPending(resultFuture);
+        resultFuture.whenComplete((result, ex) -> {
+            removeFromPending(resultFuture);
+        });
+        try {
+            pulsar.getBrokerService().getTopic(topicName, true)
+                    .thenCompose(topicOptional -> {
+                        if (!topicOptional.isPresent()) {
+                            log.error("[{}] Fail to run health check while get 
topic {}. because get null value.",
+                                    clientAppId, topicName);
+                            return CompletableFuture.failedFuture(new 
BrokerServiceException.TopicNotFoundException(
+                                    String.format("Topic [%s] not found after 
create.", topicName)));
+                        }
+                        return doHealthCheck(clientAppId, topicName, 
subscriptionName, messageStr, resultFuture);
+                    }).whenComplete((result, t) -> {
+                        if (t != null) {
+                            resultFuture.completeExceptionally(t);
+                        } else {
+                            if (!resultFuture.isDone()) {
+                                resultFuture.complete(null);
+                            }
+                        }
+                    });
+        } catch (Exception e) {
+            log.error("[{}] Fail to run health check while get topic {}. 
because get exception.",
+                    clientAppId, topicName, e);
+            resultFuture.completeExceptionally(e);
+        }
+        return resultFuture;
+    }
+
+    private synchronized void addToPending(CompletableFuture<Void> 
resultFuture) {
+        pendingFutures.add(resultFuture);
+    }
+
+    private synchronized void removeFromPending(CompletableFuture<Void> 
resultFuture) {
+        pendingFutures.remove(resultFuture);
+    }
+
+    private CompletableFuture<Void> doHealthCheck(String clientAppId, String 
topicName, String subscriptionName,
+                                                  String messageStr, 
CompletableFuture<Void> resultFuture) {
+        return client.newProducer(Schema.STRING).topic(topicName).createAsync()
+                .thenCompose(producer -> 
client.newReader(Schema.STRING).topic(topicName)
+                        .subscriptionName(subscriptionName)
+                        .startMessageId(MessageId.latest)
+                        .createAsync().exceptionally(createException -> {
+                            producer.closeAsync().exceptionally(ex -> {
+                                log.error("[{}] Close producer fail while 
heath check.", clientAppId);
+                                return null;
+                            });
+                            throw 
FutureUtil.wrapToCompletionException(createException);
+                        }).thenCompose(reader -> producer.sendAsync(messageStr)
+                                .thenCompose(__ -> 
FutureUtil.addTimeoutHandling(
+                                        healthCheckRecursiveReadNext(reader, 
messageStr),
+                                        timeout, 
pulsar.getBrokerService().executor(),
+                                        () -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
+                                .whenComplete((__, ex) -> {
+                                            closeAndReCheck(producer, reader, 
topicName,
+                                                    subscriptionName,
+                                                    clientAppId)
+                                                    .whenComplete((unused, 
innerEx) -> {
+                                                        if (ex != null) {
+                                                            
resultFuture.completeExceptionally(ex);
+                                                        } else {
+                                                            
resultFuture.complete(null);
+                                                        }
+                                                    });
+                                        }
+                                ))
+                ).exceptionally(ex -> {
+                    resultFuture.completeExceptionally(ex);
+                    return null;
+                });
+    }
+
+    /**
+     * Close producer and reader and then to re-check if this operation is 
success.
+     *
+     * Re-check
+     * - Producer: If close fails we will print error log to notify user.
+     * - Consumer: If close fails we will force delete subscription.
+     *
+     * @param producer         Producer
+     * @param reader           Reader
+     * @param subscriptionName Subscription name
+     */
+    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, 
Reader<String> reader,
+                                                    String topicName, String 
subscriptionName, String clientAppId) {
+        // no matter exception or success, we still need to
+        // close producer/reader
+        CompletableFuture<Void> producerFuture = producer.closeAsync();
+        CompletableFuture<Void> readerFuture = reader.closeAsync();
+        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+        futures.add(producerFuture);
+        futures.add(readerFuture);
+        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+                .exceptionally(closeException -> {
+                    if (readerFuture.isCompletedExceptionally()) {
+                        log.error("[{}] Close reader fail while health 
check.", clientAppId);
+                        Optional<Topic> topic = 
pulsar.getBrokerService().getTopicReference(topicName);
+                        if (topic.isPresent()) {
+                            Subscription subscription =
+                                    
topic.get().getSubscription(subscriptionName);
+                            // re-check subscription after reader close
+                            if (subscription != null) {
+                                log.warn("[{}] Force delete subscription {} "
+                                                + "when it still exists after 
the"
+                                                + " reader is closed.",
+                                        clientAppId, subscription);
+                                subscription.deleteForcefully()
+                                        .exceptionally(ex -> {
+                                            log.error("[{}] Force delete 
subscription fail"
+                                                            + " while health 
check",
+                                                    clientAppId, ex);
+                                            return null;
+                                        });
+                            }
+                        }
+                    } else {
+                        // producer future fail.
+                        log.error("[{}] Close producer fail while heath 
check.", clientAppId);
+                    }
+                    return null;
+                });
+    }
+
+    private static CompletableFuture<Void> 
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+        return reader.readNextAsync()
+                .thenCompose(msg -> {
+                    if (!Objects.equals(content, msg.getValue())) {
+                        return healthCheckRecursiveReadNext(reader, content);
+                    }
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
+    private void deleteHeartbeatTopics() {
+        log.info("forcefully deleting heartbeat topics");
+        deleteTopic(heartbeatTopicV1);
+        deleteTopic(heartbeatTopicV2);
+        log.info("finish forcefully deleting heartbeat topics");
+    }
+
+    private void deleteTopic(String heartbeatTopicV1) {
+        try {
+            pulsar.getBrokerService().deleteTopic(heartbeatTopicV1, 
true).get();
+        } catch (Exception e) {
+            Throwable realCause = e.getCause();
+            if (!(realCause instanceof 
ManagedLedgerException.MetadataNotFoundException
+                    || realCause instanceof 
MetadataStoreException.NotFoundException)) {
+                log.error("Errors in deleting heartbeat topic [{}]", 
heartbeatTopicV1, e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        try {
+            scheduledExecutorProvider.shutdownNow();
+        } catch (Exception e) {
+            log.warn("Failed to shutdown scheduled executor", e);
+        }
+        try {
+            lookupExecutor.shutdownNow();
+        } catch (Exception e) {
+            log.warn("Failed to shutdown lookup executor", e);
+        }
+        try {
+            client.close();
+        } catch (PulsarClientException e) {
+            log.warn("Failed to close pulsar client", e);
+        }
+        for (CompletableFuture<Void> pendingFuture : new 
ArrayList<>(pendingFutures)) {
+            if (!pendingFuture.isDone()) {
+                pendingFuture.completeExceptionally(
+                        new 
PulsarClientException.AlreadyClosedException("HealthChecker is closed"));
+            }
+        }
+        deleteHeartbeatTopics();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index 618e023ccbf..b39f1f955a3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -18,10 +18,11 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static 
org.apache.pulsar.broker.admin.impl.BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
+import static 
org.apache.pulsar.broker.service.HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Field;
@@ -33,9 +34,9 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.HealthChecker;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -231,17 +232,23 @@ public class AdminApiHealthCheckTest extends 
MockedPulsarServiceBaseTest {
     public void testHealthCheckTimeOut() throws Exception {
         final String testHealthCheckTopic = 
String.format("persistent://pulsar/localhost:%s/healthcheck",
                 pulsar.getConfig().getWebServicePort().get());
-        PulsarClient client = pulsar.getClient();
+        HealthChecker healthChecker = pulsar.getHealthChecker();
+        Field clientField = HealthChecker.class.getDeclaredField("client");
+        clientField.setAccessible(true);
+        PulsarClient client = (PulsarClient) clientField.get(healthChecker);
         PulsarClient spyClient = Mockito.spy(client);
         Mockito.doReturn(new DummyProducerBuilder<>((PulsarClientImpl) 
spyClient, Schema.BYTES))
                 .when(spyClient).newProducer(Schema.STRING);
-        // use reflection to replace the client in the broker
-        Field field = PulsarService.class.getDeclaredField("client");
-        field.setAccessible(true);
-        field.set(pulsar, spyClient);
+        clientField.set(healthChecker, spyClient);
+
+        // change timeout to 1 second to speed up test
+        Field timeoutField = HealthChecker.class.getDeclaredField("timeout");
+        timeoutField.setAccessible(true);
+        timeoutField.set(healthChecker, Duration.ofSeconds(1));
+
         try {
             admin.brokers().healthcheck(TopicVersion.V2);
-            throw new Exception("Should not reach here");
+            fail("Should not reach here");
         } catch (PulsarAdminException e) {
             log.info("Exception caught", e);
             assertTrue(e.getMessage().contains("LowOverheadTimeoutException"));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index a2401ebe19a..ae431312f27 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -34,9 +34,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang3.reflect.MethodUtils;
-import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.HealthChecker;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
@@ -167,7 +167,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
     public void testHealthCheckTopicNotOffload() throws Exception {
         NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
-        TopicName topicName = TopicName.get("persistent", namespaceName, 
BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+        TopicName topicName = TopicName.get("persistent", namespaceName, 
HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX);
         PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
                 .getTopic(topicName.toString(), true).get().get();
         ManagedLedgerConfig config = 
persistentTopic.getManagedLedger().getConfig();
@@ -193,7 +193,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         Assert.assertTrue(optionalTopic.isEmpty());
 
         TopicName heartbeatTopicName = TopicName.get("persistent",
-                namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+                namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX);
         admin.topics().getRetention(heartbeatTopicName.toString());
         optionalTopic = pulsar.getBrokerService()
                 .getTopic(topicName.getPartition(1).toString(), false).join();
@@ -220,7 +220,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         admin.brokers().healthcheck(TopicVersion.V2);
         NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
-        TopicName heartbeatTopicName = TopicName.get("persistent", 
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+        TopicName heartbeatTopicName = TopicName.get("persistent", 
namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX);
 
         List<String> topics = 
getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
         Assert.assertEquals(topics.size(), 1);
@@ -245,7 +245,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         List<String> topics = 
getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
         Assert.assertEquals(topics.size(), 1);
         TopicName heartbeatTopicName = TopicName.get("persistent",
-                namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+                namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX);
         Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 7860b0708e3..f7d82d673b3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
@@ -116,13 +117,16 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
     }
 
     @Override
-    public PulsarClientImpl createClientImpl(ClientConfigurationData 
clientConf) throws PulsarClientException {
+    public PulsarClientImpl createClientImpl(ClientConfigurationData 
clientConf,
+                                             
Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
+            throws PulsarClientException {
         try {
             return (PulsarClientImpl) getClient();
         } catch (PulsarServerException e) {
             throw new PulsarClientException(e);
         }
     }
+
     @Override
     protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
         return getBrokerService();

Reply via email to