This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d9e5c03a995ddaff01917a3877e84a5ea30927a9 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Thu Apr 24 19:11:51 2025 +0300 [fix][broker] Fix broker shutdown delay by resolving hanging health checks (#24210) (cherry picked from commit 12961caf4967b03634bb443d4718eb30a2a1f0ae) --- .../org/apache/pulsar/broker/PulsarService.java | 271 ++++++++++------- .../pulsar/broker/admin/impl/BrokersBase.java | 149 +--------- .../pulsar/broker/service/HealthChecker.java | 330 +++++++++++++++++++++ .../broker/admin/AdminApiHealthCheckTest.java | 23 +- .../systopic/PartitionedSystemTopicTest.java | 10 +- .../testcontext/NonStartableTestPulsarService.java | 6 +- 6 files changed, 531 insertions(+), 258 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index cb63d1db71d..308c3459065 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -101,6 +101,7 @@ import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.rest.Topics; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.HealthChecker; import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer; import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; import org.apache.pulsar.broker.service.Topic; @@ -145,6 +146,7 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.protocol.schema.SchemaStorage; @@ -300,6 +302,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private volatile CompletableFuture<Void> closeFuture; // key is listener name, value is pulsar address and pulsar ssl address private Map<String, AdvertisedListener> advertisedListeners; + private volatile HealthChecker healthChecker; public PulsarService(ServiceConfiguration config) { this(config, Optional.empty(), (exitCode) -> LOG.info("Process termination requested with code {}. " @@ -476,6 +479,11 @@ public class PulsarService implements AutoCloseable, ShutdownService { // It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; + if (healthChecker != null) { + healthChecker.close(); + healthChecker = null; + } + // close the service in reverse order v.s. in which they are started if (this.resourceUsageTransportManager != null) { try { @@ -1609,76 +1617,36 @@ public class PulsarService implements AutoCloseable, ShutdownService { return this.offloaderReadExecutor; } - public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) + public PulsarClientImpl createClientImpl(ClientConfigurationData conf) throws PulsarClientException { + return createClientImpl(conf, null); + } + + public PulsarClientImpl createClientImpl(Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer) + throws PulsarClientException { + return createClientImpl(null, customizer); + } + + public PulsarClientImpl createClientImpl(ClientConfigurationData conf, + Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer) throws PulsarClientException { - return PulsarClientImpl.builder() - .conf(clientConf) + PulsarClientImpl.PulsarClientImplBuilder pulsarClientImplBuilder = PulsarClientImpl.builder() + .conf(conf != null ? conf : createClientConfigurationData()) .eventLoopGroup(ioEventLoopGroup) .timer(brokerClientSharedTimer) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) - .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) - .build(); + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider); + if (customizer != null) { + customizer.accept(pulsarClientImplBuilder); + } + return pulsarClientImplBuilder.build(); } public synchronized PulsarClient getClient() throws PulsarServerException { if (this.client == null) { try { - ClientConfigurationData initialConf = new ClientConfigurationData(); - - // Disable memory limit for broker client and disable stats - initialConf.setMemoryLimitBytes(0); - initialConf.setStatsIntervalSeconds(0); - - // Apply all arbitrary configuration. This must be called before setting any fields annotated as - // @Secret on the ClientConfigurationData object because of the way they are serialized. - // See https://github.com/apache/pulsar/issues/8509 for more information. - Map<String, Object> overrides = PropertiesUtils - .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_"); - ClientConfigurationData conf = - ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class); - - // Disabled auto release useless connections - // The automatic release connection feature is not yet perfect for transaction scenarios, so turn it - // off first. - conf.setConnectionMaxIdleSeconds(-1); - - boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled(); - conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl); - - if (tlsEnabled) { - conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers()); - conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols()); - conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection()); - conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled()); - if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) { - conf.setUseKeyStoreTls(true); - conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType()); - conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore()); - conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword()); - conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType()); - conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore()); - conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword()); - } else { - conf.setTlsTrustCertsFilePath( - isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath()) - ? this.getConfiguration().getBrokerClientTrustCertsFilePath() - : this.getConfiguration().getTlsTrustCertsFilePath()); - conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath()); - conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath()); - } - } - - if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) { - conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin()); - conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters()); - conf.setAuthParamMap(null); - conf.setAuthentication(AuthenticationFactory.create( - this.getConfiguration().getBrokerClientAuthenticationPlugin(), - this.getConfiguration().getBrokerClientAuthenticationParameters())); - } - this.client = createClientImpl(conf); + this.client = createClientImpl(null, null); } catch (Exception e) { throw new PulsarServerException(e); } @@ -1686,59 +1654,120 @@ public class PulsarService implements AutoCloseable, ShutdownService { return this.client; } + protected ClientConfigurationData createClientConfigurationData() + throws PulsarClientException.UnsupportedAuthenticationException { + ClientConfigurationData initialConf = new ClientConfigurationData(); + + // Disable memory limit for broker client and disable stats + initialConf.setMemoryLimitBytes(0); + initialConf.setStatsIntervalSeconds(0); + + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + Map<String, Object> overrides = PropertiesUtils + .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_"); + ClientConfigurationData conf = + ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class); + + // Disabled auto release useless connections + // The automatic release connection feature is not yet perfect for transaction scenarios, so turn it + // off first. + conf.setConnectionMaxIdleSeconds(-1); + + boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled(); + conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl); + + if (tlsEnabled) { + conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers()); + conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols()); + conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection()); + conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled()); + if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) { + conf.setUseKeyStoreTls(true); + conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType()); + conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore()); + conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword()); + conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType()); + conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore()); + conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword()); + } else { + conf.setTlsTrustCertsFilePath( + isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath()) + ? this.getConfiguration().getBrokerClientTrustCertsFilePath() + : this.getConfiguration().getTlsTrustCertsFilePath()); + conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath()); + conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath()); + } + } + + if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) { + conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin()); + conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters()); + conf.setAuthParamMap(null); + conf.setAuthentication(AuthenticationFactory.create( + this.getConfiguration().getBrokerClientAuthenticationPlugin(), + this.getConfiguration().getBrokerClientAuthenticationParameters())); + } + return conf; + } + public synchronized PulsarAdmin getAdminClient() throws PulsarServerException { if (this.adminClient == null) { try { - ServiceConfiguration conf = this.getConfiguration(); - final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress; - if (adminApiUrl == null) { - throw new IllegalArgumentException("Web service address was not set properly " - + ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled() - + ", webServiceAddressTls: " + webServiceAddressTls - + ", webServiceAddress: " + webServiceAddress); - } - PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl); - - // Apply all arbitrary configuration. This must be called before setting any fields annotated as - // @Secret on the ClientConfigurationData object because of the way they are serialized. - // See https://github.com/apache/pulsar/issues/8509 for more information. - builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_")); + this.adminClient = getCreateAdminClientBuilder().build(); + LOG.info("created admin with url {} ", adminClient.getServiceUrl()); + } catch (Exception e) { + throw new PulsarServerException(e); + } + } + return this.adminClient; + } - builder.authentication( - conf.getBrokerClientAuthenticationPlugin(), - conf.getBrokerClientAuthenticationParameters()); - - if (conf.isBrokerClientTlsEnabled()) { - builder.tlsCiphers(config.getBrokerClientTlsCiphers()) - .tlsProtocols(config.getBrokerClientTlsProtocols()); - if (conf.isBrokerClientTlsEnabledWithKeyStore()) { - builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType()) - .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore()) - .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword()) - .tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType()) - .tlsKeyStorePath(conf.getBrokerClientTlsKeyStore()) - .tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword()); - } else { - builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()) - .tlsKeyFilePath(conf.getBrokerClientKeyFilePath()) - .tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath()); - } - builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()) - .enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled()); - } + protected PulsarAdminBuilder getCreateAdminClientBuilder() + throws PulsarClientException.UnsupportedAuthenticationException { + ServiceConfiguration conf = this.getConfiguration(); + final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress; + if (adminApiUrl == null) { + throw new IllegalArgumentException("Web service address was not set properly " + + ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled() + + ", webServiceAddressTls: " + webServiceAddressTls + + ", webServiceAddress: " + webServiceAddress); + } + PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl); - // most of the admin request requires to make zk-call so, keep the max read-timeout based on - // zk-operation timeout - builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_")); - this.adminClient = builder.build(); - LOG.info("created admin with url {} ", adminApiUrl); - } catch (Exception e) { - throw new PulsarServerException(e); + builder.authentication( + conf.getBrokerClientAuthenticationPlugin(), + conf.getBrokerClientAuthenticationParameters()); + + if (conf.isBrokerClientTlsEnabled()) { + builder.tlsCiphers(config.getBrokerClientTlsCiphers()) + .tlsProtocols(config.getBrokerClientTlsProtocols()); + if (conf.isBrokerClientTlsEnabledWithKeyStore()) { + builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType()) + .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore()) + .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword()) + .tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType()) + .tlsKeyStorePath(conf.getBrokerClientTlsKeyStore()) + .tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword()); + } else { + builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()) + .tlsKeyFilePath(conf.getBrokerClientKeyFilePath()) + .tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath()); } + builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()) + .enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled()); } - return this.adminClient; + // most of the admin request requires to make zk-call so, keep the max read-timeout based on + // zk-operation timeout + builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + return builder; } /** @@ -2069,4 +2098,40 @@ public class PulsarService implements AutoCloseable, ShutdownService { mutex.unlock(); } } + + /** + * Run health check for the broker. + * + * @return CompletableFuture + */ + public CompletableFuture<Void> runHealthCheck(TopicVersion topicVersion, String clientId) { + if (!isRunning()) { + return CompletableFuture.failedFuture(new PulsarServerException("Broker is not running")); + } + HealthChecker localHealthChecker = getHealthChecker(); + if (localHealthChecker == null) { + return CompletableFuture.failedFuture(new PulsarServerException("Broker is not running")); + } + return localHealthChecker.checkHealth(topicVersion, clientId); + } + + @VisibleForTesting + public HealthChecker getHealthChecker() { + if (healthChecker == null) { + synchronized (this) { + if (healthChecker == null) { + if (!isRunning()) { + return null; + } + try { + healthChecker = new HealthChecker(this); + } catch (PulsarServerException e) { + LOG.error("Failed to create health checker", e); + throw new RuntimeException(e); + } + } + } + } + return healthChecker; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 3b5bc66f5d1..ea25367ca3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -26,16 +26,11 @@ import io.swagger.annotations.ApiResponses; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -50,22 +45,12 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService.State; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LeaderBroker; -import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.Subscription; -import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.conf.InternalConfigurationData; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.BrokerOperation; @@ -80,16 +65,9 @@ import org.slf4j.LoggerFactory; */ public class BrokersBase extends AdminResource { private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class); - public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck"; // log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; - // there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout - // a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception. - // or we can't propagate the server timeout exception to the client. - private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58); - private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION = - FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)"); private static volatile long threadDumpLoggedTimestamp; @GET @@ -384,16 +362,21 @@ public class BrokersBase extends AdminResource { @ApiResponse(code = 307, message = "Current broker is not the target broker"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), - @ApiResponse(code = 500, message = "Internal server error")}) + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Service unavailable")}) public void healthCheck(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Topic Version") @QueryParam("topicVersion") TopicVersion topicVersion, @QueryParam("brokerId") String brokerId) { + if (pulsar().getState() == State.Closed || pulsar().getState() == State.Closing) { + asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE).build()); + return; + } validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId, BrokerOperation.HEALTH_CHECK) - .thenAccept(__ -> checkDeadlockedThreads()) .thenCompose(__ -> maybeRedirectToBroker( StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId)) + .thenAccept(__ -> checkDeadlockedThreads()) .thenCompose(__ -> internalRunHealthCheck(topicVersion)) .thenAccept(__ -> { LOG.info("[{}] Successfully run health check.", clientAppId()); @@ -431,124 +414,8 @@ public class BrokersBase extends AdminResource { } } - private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) { - String brokerId = pulsar().getBrokerId(); - NamespaceName namespaceName = (topicVersion == TopicVersion.V2) - ? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration()) - : NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration()); - final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); - LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName); - final String messageStr = UUID.randomUUID().toString(); - final String subscriptionName = "healthCheck-" + messageStr; - // create non-partitioned topic manually and close the previous reader if present. - return pulsar().getBrokerService().getTopic(topicName, true) - .thenCompose(topicOptional -> { - if (!topicOptional.isPresent()) { - LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", - clientAppId(), topicName); - throw new RestException(Status.NOT_FOUND, - String.format("Topic [%s] not found after create.", topicName)); - } - PulsarClient client; - try { - client = pulsar().getClient(); - } catch (PulsarServerException e) { - LOG.error("[{}] Fail to run health check while get client.", clientAppId()); - throw new RestException(e); - } - CompletableFuture<Void> resultFuture = new CompletableFuture<>(); - client.newProducer(Schema.STRING).topic(topicName).createAsync() - .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName) - .subscriptionName(subscriptionName) - .startMessageId(MessageId.latest) - .createAsync().exceptionally(createException -> { - producer.closeAsync().exceptionally(ex -> { - LOG.error("[{}] Close producer fail while heath check.", clientAppId()); - return null; - }); - throw FutureUtil.wrapToCompletionException(createException); - }).thenCompose(reader -> producer.sendAsync(messageStr) - .thenCompose(__ -> FutureUtil.addTimeoutHandling( - healthCheckRecursiveReadNext(reader, messageStr), - HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(), - () -> HEALTH_CHECK_TIMEOUT_EXCEPTION)) - .whenComplete((__, ex) -> { - closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName) - .whenComplete((unused, innerEx) -> { - if (ex != null) { - resultFuture.completeExceptionally(ex); - } else { - resultFuture.complete(null); - } - }); - } - )) - ).exceptionally(ex -> { - resultFuture.completeExceptionally(ex); - return null; - }); - return resultFuture; - }); - } - - /** - * Close producer and reader and then to re-check if this operation is success. - * - * Re-check - * - Producer: If close fails we will print error log to notify user. - * - Consumer: If close fails we will force delete subscription. - * - * @param producer Producer - * @param reader Reader - * @param topic Topic - * @param subscriptionName Subscription name - */ - private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader, - Topic topic, String subscriptionName) { - // no matter exception or success, we still need to - // close producer/reader - CompletableFuture<Void> producerFuture = producer.closeAsync(); - CompletableFuture<Void> readerFuture = reader.closeAsync(); - List<CompletableFuture<Void>> futures = new ArrayList<>(2); - futures.add(producerFuture); - futures.add(readerFuture); - return FutureUtil.waitForAll(Collections.unmodifiableList(futures)) - .exceptionally(closeException -> { - if (readerFuture.isCompletedExceptionally()) { - LOG.error("[{}] Close reader fail while heath check.", clientAppId()); - Subscription subscription = - topic.getSubscription(subscriptionName); - // re-check subscription after reader close - if (subscription != null) { - LOG.warn("[{}] Force delete subscription {} " - + "when it still exists after the" - + " reader is closed.", - clientAppId(), subscription); - subscription.deleteForcefully() - .exceptionally(ex -> { - LOG.error("[{}] Force delete subscription fail" - + " while health check", - clientAppId(), ex); - return null; - }); - } - } else { - // producer future fail. - LOG.error("[{}] Close producer fail while heath check.", clientAppId()); - } - return null; - }); - } - - private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) { - return reader.readNextAsync() - .thenCompose(msg -> { - if (!Objects.equals(content, msg.getValue())) { - return healthCheckRecursiveReadNext(reader, content); - } - return CompletableFuture.completedFuture(null); - }); + return pulsar().runHealthCheck(topicVersion, clientAppId()); } private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HealthChecker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HealthChecker.java new file mode 100644 index 00000000000..e80e0b09475 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HealthChecker.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.util.ScheduledExecutorProvider; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicVersion; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; + +/** + * The HealthChecker class provides functionality to monitor and verify the health of a Pulsar broker. + * It performs health checks by creating test topics, producing and consuming messages to verify broker functionality. + * This class implements AutoCloseable to ensure proper cleanup of resources when the broker is shut down. + */ +@Slf4j +public class HealthChecker implements AutoCloseable{ + /** + * Suffix used for health check topic names. + */ + public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck"; + /** + * Timeout duration for health check operations. + * Set to 58 seconds to be shorter than the client's default 60-second timeout, + * allowing server timeout exceptions to propagate properly to the client. + */ + private static final Duration DEFAULT_HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58); + /** + * Pre-created timeout exception for health check operations. + */ + private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION = + FutureUtil.createTimeoutException("Timeout", HealthChecker.class, "healthCheckRecursiveReadNext(...)"); + /** + * Reference to the main Pulsar service. + */ + private final PulsarService pulsar; + /** + * Topic name for v1 heartbeat checks. + */ + private final String heartbeatTopicV1; + /** + * Topic name for v2 heartbeat checks. + */ + private final String heartbeatTopicV2; + /** + * Pulsar client instance for health check operations. + * A separate client is needed so that it can be shutdown before the webservice is closed. + * Pending requests for healthchecks to the /health endpoint can be cancelled this way. + */ + private final PulsarClient client; + /** + * Executor for lookup operations. + * This is also needed so that pending healthchecks can be properly cancelled at shutdown. + */ + private final ScheduledExecutorProvider lookupExecutor; + /** + * Executor for scheduled tasks. + * This is also needed so that pending healthchecks can be properly cancelled at shutdown. + */ + private final ScheduledExecutorProvider scheduledExecutorProvider; + /** + * Set of pending health check operations. + */ + private final Set<CompletableFuture<Void>> pendingFutures = new HashSet<>(); + + private final Duration timeout = DEFAULT_HEALTH_CHECK_READ_TIMEOUT; + + public HealthChecker(PulsarService pulsar) throws PulsarServerException { + this.pulsar = pulsar; + this.heartbeatTopicV1 = getHeartbeatTopicName(pulsar.getBrokerId(), pulsar.getConfiguration(), false); + this.heartbeatTopicV2 = getHeartbeatTopicName(pulsar.getBrokerId(), pulsar.getConfiguration(), true); + this.lookupExecutor = + new ScheduledExecutorProvider(1, "health-checker-client-lookup-executor"); + this.scheduledExecutorProvider = + new ScheduledExecutorProvider(1, "health-checker-client-scheduled-executor"); + try { + this.client = pulsar.createClientImpl(builder -> { + builder.lookupExecutorProvider(lookupExecutor); + builder.scheduledExecutorProvider(scheduledExecutorProvider); + }); + } catch (PulsarClientException e) { + throw new PulsarServerException("Error creating client for HealthChecker", e); + } + } + + private static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) { + NamespaceName namespaceName = isV2 + ? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration) + : NamespaceService.getHeartbeatNamespace(brokerId, configuration); + return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); + } + + /** + * Performs a health check on the broker by verifying message production and consumption. + * The health check process includes: + * 1. Producing a test message + * 2. Reading the message back to verify end-to-end functionality + * + * @param topicVersion The version of the topic to use (V1 or V2) + * @param clientAppId The identifier of the client application requesting the health check + * @return A CompletableFuture that completes when the health check is successful, or completes exceptionally if the + * check fails + */ + public CompletableFuture<Void> checkHealth(TopicVersion topicVersion, String clientAppId) { + final String topicName = topicVersion == TopicVersion.V2 ? heartbeatTopicV2 : heartbeatTopicV1; + log.info("[{}] Running healthCheck with topic={}", clientAppId, topicName); + final String messageStr = UUID.randomUUID().toString(); + final String subscriptionName = "healthCheck-" + messageStr; + // create non-partitioned topic manually and close the previous reader if present. + CompletableFuture<Void> resultFuture = new CompletableFuture<>(); + addToPending(resultFuture); + resultFuture.whenComplete((result, ex) -> { + removeFromPending(resultFuture); + }); + try { + pulsar.getBrokerService().getTopic(topicName, true) + .thenCompose(topicOptional -> { + if (!topicOptional.isPresent()) { + log.error("[{}] Fail to run health check while get topic {}. because get null value.", + clientAppId, topicName); + return CompletableFuture.failedFuture(new BrokerServiceException.TopicNotFoundException( + String.format("Topic [%s] not found after create.", topicName))); + } + return doHealthCheck(clientAppId, topicName, subscriptionName, messageStr, resultFuture); + }).whenComplete((result, t) -> { + if (t != null) { + resultFuture.completeExceptionally(t); + } else { + if (!resultFuture.isDone()) { + resultFuture.complete(null); + } + } + }); + } catch (Exception e) { + log.error("[{}] Fail to run health check while get topic {}. because get exception.", + clientAppId, topicName, e); + resultFuture.completeExceptionally(e); + } + return resultFuture; + } + + private synchronized void addToPending(CompletableFuture<Void> resultFuture) { + pendingFutures.add(resultFuture); + } + + private synchronized void removeFromPending(CompletableFuture<Void> resultFuture) { + pendingFutures.remove(resultFuture); + } + + private CompletableFuture<Void> doHealthCheck(String clientAppId, String topicName, String subscriptionName, + String messageStr, CompletableFuture<Void> resultFuture) { + return client.newProducer(Schema.STRING).topic(topicName).createAsync() + .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName) + .subscriptionName(subscriptionName) + .startMessageId(MessageId.latest) + .createAsync().exceptionally(createException -> { + producer.closeAsync().exceptionally(ex -> { + log.error("[{}] Close producer fail while heath check.", clientAppId); + return null; + }); + throw FutureUtil.wrapToCompletionException(createException); + }).thenCompose(reader -> producer.sendAsync(messageStr) + .thenCompose(__ -> FutureUtil.addTimeoutHandling( + healthCheckRecursiveReadNext(reader, messageStr), + timeout, pulsar.getBrokerService().executor(), + () -> HEALTH_CHECK_TIMEOUT_EXCEPTION)) + .whenComplete((__, ex) -> { + closeAndReCheck(producer, reader, topicName, + subscriptionName, + clientAppId) + .whenComplete((unused, innerEx) -> { + if (ex != null) { + resultFuture.completeExceptionally(ex); + } else { + resultFuture.complete(null); + } + }); + } + )) + ).exceptionally(ex -> { + resultFuture.completeExceptionally(ex); + return null; + }); + } + + /** + * Close producer and reader and then to re-check if this operation is success. + * + * Re-check + * - Producer: If close fails we will print error log to notify user. + * - Consumer: If close fails we will force delete subscription. + * + * @param producer Producer + * @param reader Reader + * @param subscriptionName Subscription name + */ + private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader, + String topicName, String subscriptionName, String clientAppId) { + // no matter exception or success, we still need to + // close producer/reader + CompletableFuture<Void> producerFuture = producer.closeAsync(); + CompletableFuture<Void> readerFuture = reader.closeAsync(); + List<CompletableFuture<Void>> futures = new ArrayList<>(2); + futures.add(producerFuture); + futures.add(readerFuture); + return FutureUtil.waitForAll(Collections.unmodifiableList(futures)) + .exceptionally(closeException -> { + if (readerFuture.isCompletedExceptionally()) { + log.error("[{}] Close reader fail while health check.", clientAppId); + Optional<Topic> topic = pulsar.getBrokerService().getTopicReference(topicName); + if (topic.isPresent()) { + Subscription subscription = + topic.get().getSubscription(subscriptionName); + // re-check subscription after reader close + if (subscription != null) { + log.warn("[{}] Force delete subscription {} " + + "when it still exists after the" + + " reader is closed.", + clientAppId, subscription); + subscription.deleteForcefully() + .exceptionally(ex -> { + log.error("[{}] Force delete subscription fail" + + " while health check", + clientAppId, ex); + return null; + }); + } + } + } else { + // producer future fail. + log.error("[{}] Close producer fail while heath check.", clientAppId); + } + return null; + }); + } + + private static CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) { + return reader.readNextAsync() + .thenCompose(msg -> { + if (!Objects.equals(content, msg.getValue())) { + return healthCheckRecursiveReadNext(reader, content); + } + return CompletableFuture.completedFuture(null); + }); + } + + private void deleteHeartbeatTopics() { + log.info("forcefully deleting heartbeat topics"); + deleteTopic(heartbeatTopicV1); + deleteTopic(heartbeatTopicV2); + log.info("finish forcefully deleting heartbeat topics"); + } + + private void deleteTopic(String heartbeatTopicV1) { + try { + pulsar.getBrokerService().deleteTopic(heartbeatTopicV1, true).get(); + } catch (Exception e) { + Throwable realCause = e.getCause(); + if (!(realCause instanceof ManagedLedgerException.MetadataNotFoundException + || realCause instanceof MetadataStoreException.NotFoundException)) { + log.error("Errors in deleting heartbeat topic [{}]", heartbeatTopicV1, e); + } + } + } + + @Override + public synchronized void close() throws Exception { + try { + scheduledExecutorProvider.shutdownNow(); + } catch (Exception e) { + log.warn("Failed to shutdown scheduled executor", e); + } + try { + lookupExecutor.shutdownNow(); + } catch (Exception e) { + log.warn("Failed to shutdown lookup executor", e); + } + try { + client.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close pulsar client", e); + } + for (CompletableFuture<Void> pendingFuture : new ArrayList<>(pendingFutures)) { + if (!pendingFuture.isDone()) { + pendingFuture.completeExceptionally( + new PulsarClientException.AlreadyClosedException("HealthChecker is closed")); + } + } + deleteHeartbeatTopics(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java index 618e023ccbf..b39f1f955a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java @@ -18,10 +18,11 @@ */ package org.apache.pulsar.broker.admin; -import static org.apache.pulsar.broker.admin.impl.BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX; +import static org.apache.pulsar.broker.service.HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; import java.lang.reflect.Field; @@ -33,9 +34,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.HealthChecker; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -231,17 +232,23 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest { public void testHealthCheckTimeOut() throws Exception { final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck", pulsar.getConfig().getWebServicePort().get()); - PulsarClient client = pulsar.getClient(); + HealthChecker healthChecker = pulsar.getHealthChecker(); + Field clientField = HealthChecker.class.getDeclaredField("client"); + clientField.setAccessible(true); + PulsarClient client = (PulsarClient) clientField.get(healthChecker); PulsarClient spyClient = Mockito.spy(client); Mockito.doReturn(new DummyProducerBuilder<>((PulsarClientImpl) spyClient, Schema.BYTES)) .when(spyClient).newProducer(Schema.STRING); - // use reflection to replace the client in the broker - Field field = PulsarService.class.getDeclaredField("client"); - field.setAccessible(true); - field.set(pulsar, spyClient); + clientField.set(healthChecker, spyClient); + + // change timeout to 1 second to speed up test + Field timeoutField = HealthChecker.class.getDeclaredField("timeout"); + timeoutField.setAccessible(true); + timeoutField.set(healthChecker, Duration.ofSeconds(1)); + try { admin.brokers().healthcheck(TopicVersion.V2); - throw new Exception("Should not reach here"); + fail("Should not reach here"); } catch (PulsarAdminException e) { log.info("Exception caught", e); assertTrue(e.getMessage().contains("LowOverheadTimeoutException")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index a2401ebe19a..ae431312f27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -34,9 +34,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang3.reflect.MethodUtils; -import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.HealthChecker; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.SchemaRegistry; @@ -167,7 +167,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { public void testHealthCheckTopicNotOffload() throws Exception { NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfig()); - TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + TopicName topicName = TopicName.get("persistent", namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX); PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() .getTopic(topicName.toString(), true).get().get(); ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig(); @@ -193,7 +193,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { Assert.assertTrue(optionalTopic.isEmpty()); TopicName heartbeatTopicName = TopicName.get("persistent", - namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX); admin.topics().getRetention(heartbeatTopicName.toString()); optionalTopic = pulsar.getBrokerService() .getTopic(topicName.getPartition(1).toString(), false).join(); @@ -220,7 +220,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { admin.brokers().healthcheck(TopicVersion.V2); NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfig()); - TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX); List<String> topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); Assert.assertEquals(topics.size(), 1); @@ -245,7 +245,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { List<String> topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); Assert.assertEquals(topics.size(), 1); TopicName heartbeatTopicName = TopicName.get("persistent", - namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + namespaceName, HealthChecker.HEALTH_CHECK_TOPIC_SUFFIX); Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java index 7860b0708e3..f7d82d673b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.apache.pulsar.broker.BookKeeperClientFactory; @@ -116,13 +117,16 @@ class NonStartableTestPulsarService extends AbstractTestPulsarService { } @Override - public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) throws PulsarClientException { + public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf, + Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer) + throws PulsarClientException { try { return (PulsarClientImpl) getClient(); } catch (PulsarServerException e) { throw new PulsarClientException(e); } } + @Override protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { return getBrokerService();