This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d5303514ef [cleanup][broker] Various cleanups (#20658)
8d5303514ef is described below
commit 8d5303514efa6cf0fa98c7285339a11821c7cf79
Author: Christophe Bornet <[email protected]>
AuthorDate: Fri Jun 30 14:35:21 2023 +0200
[cleanup][broker] Various cleanups (#20658)
---
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 3 +-
.../pulsar/broker/ManagedLedgerClientFactory.java | 18 +--
.../org/apache/pulsar/broker/PulsarService.java | 51 ++++---
.../broker/TransactionMetadataStoreService.java | 32 ++--
.../pulsar/broker/loadbalance/LoadManager.java | 17 +--
.../pulsar/broker/namespace/NamespaceService.java | 169 ++++++++++-----------
.../pulsar/broker/web/PulsarWebResource.java | 27 ++--
.../apache/pulsar/broker/web/RequestWrapper.java | 2 +-
.../pulsar/broker/web/ResponseHandlerFilter.java | 8 +-
.../apache/pulsar/broker/web/RestException.java | 4 +-
.../org/apache/pulsar/broker/web/WebService.java | 4 +-
11 files changed, 152 insertions(+), 183 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 0259dfc7a58..0ecca755956 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -49,7 +49,6 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
-@SuppressWarnings("deprecation")
@Slf4j
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
@@ -71,7 +70,7 @@ public class BookKeeperClientFactoryImpl implements
BookKeeperClientFactory {
ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
if (properties != null) {
- properties.forEach((key, value) -> bkConf.setProperty(key, value));
+ properties.forEach(bkConf::setProperty);
}
if (ensemblePlacementPolicyClass.isPresent()) {
setEnsemblePlacementPolicy(bkConf, conf, store,
ensemblePlacementPolicyClass.get());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index d86649abd3c..51fb8bc1ae3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -158,17 +158,15 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
// factory, however that might be introducing more unknowns.
log.warn("Encountered exceptions on closing bookkeeper
client", ree);
}
- if (bkEnsemblePolicyToBkClientMap != null) {
- bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
- try {
- if (bk != null) {
- bk.close();
- }
- } catch (Exception e) {
- log.warn("Failed to close bookkeeper-client for policy
{}", policy, e);
+ bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
+ try {
+ if (bk != null) {
+ bk.close();
}
- });
- }
+ } catch (Exception e) {
+ log.warn("Failed to close bookkeeper-client for policy
{}", policy, e);
+ }
+ });
log.info("Closed BookKeeper client");
} catch (Exception e) {
log.warn(e.getMessage(), e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2b64a8cfb5c..3d1254eec42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -185,13 +185,12 @@ import org.slf4j.LoggerFactory;
/**
* Main class for Pulsar broker service.
*/
-
@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable, ShutdownService {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarService.class);
private static final double
GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
- private ServiceConfiguration config = null;
+ private final ServiceConfiguration config;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
private LeaderElectionService leaderElectionService = null;
@@ -255,7 +254,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private AdditionalServlets brokerAdditionalServlets;
// packages management service
- private Optional<PackagesManagement> packagesManagement = Optional.empty();
+ private PackagesManagement packagesManagement = null;
private PulsarPrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
@@ -285,10 +284,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private Map<String, AdvertisedListener> advertisedListeners;
public PulsarService(ServiceConfiguration config) {
- this(config, Optional.empty(), (exitCode) -> {
- LOG.info("Process termination requested with code {}. "
- + "Ignoring, as this constructor is intended for
tests. ", exitCode);
- });
+ this(config, Optional.empty(), (exitCode) -> LOG.info("Process
termination requested with code {}. "
+ + "Ignoring, as this constructor is intended for tests. ",
exitCode));
}
public PulsarService(ServiceConfiguration config, Optional<WorkerService>
functionWorkerService,
@@ -370,7 +367,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
/**
* Close the session to the metadata service.
- *
+ * <p>
* This will immediately release all the resource locks held by this
broker on the coordination service.
*
* @throws Exception if the close operation fails
@@ -400,8 +397,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
throw (PulsarServerException) cause;
} else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
&& (cause instanceof TimeoutException || cause instanceof
CancellationException)) {
- // ignore shutdown timeout when timeout is 0, which is
primarily used in tests
- // to forcefully shutdown the broker
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Shutdown timeout ignored when timeout is 0, "
+ + "which is primarily used in tests to forcefully
shutdown the broker",
+ cause);
+ }
} else {
throw new PulsarServerException(cause);
}
@@ -693,7 +694,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
throw new PulsarServerException("Cannot start the service once
it was stopped");
}
- if (!config.getWebServicePort().isPresent() &&
!config.getWebServicePortTls().isPresent()) {
+ if (config.getWebServicePort().isEmpty() &&
config.getWebServicePortTls().isEmpty()) {
throw new
IllegalArgumentException("webServicePort/webServicePortTls must be present");
}
@@ -722,7 +723,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
config.getDefaultRetentionTimeInMinutes() * 60));
}
- if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+ if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty()
&& config.isLoadBalancerEnabled()
&& LinuxInfoUtils.isLinux()
&& !LinuxInfoUtils.checkHasNicSpeeds()) {
@@ -896,7 +897,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (isNotBlank(config.getResourceUsageTransportClassName())) {
Class<?> clazz =
Class.forName(config.getResourceUsageTransportClassName());
Constructor<?> ctor =
clazz.getConstructor(PulsarService.class);
- Object object = ctor.newInstance(new Object[]{this});
+ Object object = ctor.newInstance(this);
this.resourceUsageTransportManager =
(ResourceUsageTopicTransportManager) object;
}
this.resourceGroupServiceManager = new ResourceGroupService(this);
@@ -1241,7 +1242,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
* Load all the topics contained in a namespace.
*
* @param bundle <code>NamespaceBundle</code> to identify the service unit
- * @throws Exception
*/
public void loadNamespaceTopics(NamespaceBundle bundle) {
executor.submit(() -> {
@@ -1296,7 +1296,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
config.getConfigurationMetadataStoreUrl(),
new ClientConfiguration().getZkLedgersRootPath(),
config.isBookkeeperMetadataStoreSeparated() ?
config.getBookkeeperMetadataStoreUrl() : null,
- this.getWorkerConfig().map(wc ->
wc.getStateStorageServiceUrl()).orElse(null));
+
this.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
}
/**
@@ -1411,7 +1411,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
Offloaders offloaders =
offloadersCache.getOrLoadOffloaders(
offloadPolicies.getOffloadersDirectory(),
config.getNarExtractionDirectory());
- LedgerOffloaderFactory offloaderFactory =
offloaders.getOffloaderFactory(
+ LedgerOffloaderFactory<?> offloaderFactory =
offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
@@ -1699,7 +1699,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
AdvertisedListener internalListener =
ServiceConfigurationUtils.getInternalListener(config, "http");
return internalListener.getBrokerHttpUrl() != null
? internalListener.getBrokerHttpUrl().toString()
- :
webAddress(ServiceConfigurationUtils.getWebServiceAddress(config),
getListenPortHTTP().get());
+ :
webAddress(ServiceConfigurationUtils.getWebServiceAddress(config),
+ getListenPortHTTP().orElseThrow());
} else {
return null;
}
@@ -1714,7 +1715,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
AdvertisedListener internalListener =
ServiceConfigurationUtils.getInternalListener(config, "https");
return internalListener.getBrokerHttpsUrl() != null
? internalListener.getBrokerHttpsUrl().toString()
- :
webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config),
getListenPortHTTPS().get());
+ :
webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config),
+ getListenPortHTTPS().orElseThrow());
} else {
return null;
}
@@ -1736,7 +1738,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
public String getLookupServiceAddress() {
return String.format("%s:%s", advertisedAddress,
config.getWebServicePort().isPresent()
? config.getWebServicePort().get()
- : config.getWebServicePortTls().get());
+ : config.getWebServicePortTls().orElseThrow());
}
public TopicPoliciesService getTopicPoliciesService() {
@@ -1798,21 +1800,22 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
public PackagesManagement getPackagesManagement() throws
UnsupportedOperationException {
- return packagesManagement.orElseThrow(() -> new
UnsupportedOperationException("Package Management Service "
- + "is not enabled in the broker."));
+ if (packagesManagement == null) {
+ throw new UnsupportedOperationException("Package Management
Service is not enabled in the broker.");
+ }
+ return packagesManagement;
}
private void startPackagesManagementService() throws IOException {
// TODO: using provider to initialize the packages management service.
- PackagesManagement packagesManagementService = new
PackagesManagementImpl();
- this.packagesManagement = Optional.of(packagesManagementService);
+ this.packagesManagement = new PackagesManagementImpl();
PackagesStorageProvider storageProvider = PackagesStorageProvider
.newProvider(config.getPackagesManagementStorageProvider());
DefaultPackagesStorageConfiguration storageConfiguration = new
DefaultPackagesStorageConfiguration();
storageConfiguration.setProperty(config.getProperties());
PackagesStorage storage =
storageProvider.getStorage(storageConfiguration);
storage.initialize();
- packagesManagementService.initialize(storage);
+ this.packagesManagement.initialize(storage);
}
public Optional<Integer> getListenPortHTTP() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 35aa7cc2fdd..c80580b02f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -85,16 +85,13 @@ public class TransactionMetadataStoreService {
private final Timer transactionOpRetryTimer;
// this semaphore for loading one transaction coordinator with the same tc
id on the same time
private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
- // one connect request open the transactionMetaStore the other request
will add to the queue, when the open op
- // finished the request will be poll and complete the future
+ // one connect request opens the transactionMetaStore the other request
will add to the queue, when the open op
+ // finishes the request will be polled and will complete the future
private final
ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>>
pendingConnectRequests;
private final ExecutorService internalPinnedExecutor;
private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
- private final ThreadFactory threadFactory =
- new
ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
-
public TransactionMetadataStoreService(TransactionMetadataStoreProvider
transactionMetadataStoreProvider,
PulsarService pulsarService,
TransactionBufferClient tbClient,
@@ -108,6 +105,8 @@ public class TransactionMetadataStoreService {
this.tcLoadSemaphores =
ConcurrentLongHashMap.<Semaphore>newBuilder().build();
this.pendingConnectRequests =
ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
+ ThreadFactory threadFactory =
+ new
ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
this.internalPinnedExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
}
@@ -200,7 +199,7 @@ public class TransactionMetadataStoreService {
// then handle the requests witch in the queue
deque.add(completableFuture);
if (LOG.isDebugEnabled()) {
- LOG.debug("Handle tc client connect added into
pending queue! tcId : {}", tcId.toString());
+ LOG.debug("Handle tc client connect added into
pending queue! tcId : {}", tcId);
}
}
})).exceptionally(ex -> {
@@ -367,17 +366,11 @@ public class TransactionMetadataStoreService {
private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus,
int txnAction,
TxnID txnID,
TxnStatus expectStatus) {
- boolean isLegal;
- switch (txnStatus) {
- case COMMITTING:
- isLegal = (txnAction == TxnAction.COMMIT.getValue());
- break;
- case ABORTING:
- isLegal = (txnAction == TxnAction.ABORT.getValue());
- break;
- default:
- isLegal = false;
- }
+ boolean isLegal = switch (txnStatus) {
+ case COMMITTING -> (txnAction == TxnAction.COMMIT.getValue());
+ case ABORTING -> (txnAction == TxnAction.ABORT.getValue());
+ default -> false;
+ };
if (!isLegal) {
if (LOG.isDebugEnabled()) {
LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {},
TxnAction : {}", txnID, txnAction);
@@ -502,15 +495,14 @@ public class TransactionMetadataStoreService {
public void close () {
this.internalPinnedExecutor.shutdown();
- stores.forEach((tcId, metadataStore) -> {
+ stores.forEach((tcId, metadataStore) ->
metadataStore.closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("Close transaction metadata store with id " +
tcId, ex);
} else {
LOG.info("Removed and closed transaction meta store {}",
tcId);
}
- });
- });
+ }));
stores.clear();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index 17bff57b85c..2cce68b60cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
* LoadManager runs through set of load reports collected from different
brokers and generates a recommendation of
* namespace/ServiceUnit placement on machines/ResourceUnit. Each Concrete
Load Manager will use different algorithms to
* generate this mapping.
- *
+ * <p>
* Concrete Load Manager is also return the least loaded broker that should
own the new namespace.
*/
public interface LoadManager {
@@ -88,7 +88,7 @@ public interface LoadManager {
/**
* Publish the current load report on ZK, forced or not.
- * By default rely on method writeLoadReportOnZookeeper().
+ * By default, rely on method writeLoadReportOnZookeeper().
*/
default void writeLoadReportOnZookeeper(boolean force) throws Exception {
writeLoadReportOnZookeeper();
@@ -118,15 +118,15 @@ public interface LoadManager {
* Removes visibility of current broker from loadbalancer list so, other
brokers can't redirect any request to this
* broker and this broker won't accept new connection requests.
*
- * @throws Exception
+ * @throws Exception if there is any error while disabling broker
*/
void disableBroker() throws Exception;
/**
* Get list of available brokers in cluster.
*
- * @return
- * @throws Exception
+ * @return the list of available brokers
+ * @throws Exception if there is any error while getting available brokers
*/
Set<String> getAvailableBrokers() throws Exception;
@@ -150,12 +150,11 @@ public interface LoadManager {
// Assume there is a constructor with one argument of
PulsarService.
final Object loadManagerInstance =
Reflections.createInstance(conf.getLoadManagerClassName(),
Thread.currentThread().getContextClassLoader());
- if (loadManagerInstance instanceof LoadManager) {
- final LoadManager casted = (LoadManager) loadManagerInstance;
+ if (loadManagerInstance instanceof LoadManager casted) {
casted.initialize(pulsar);
return casted;
- } else if (loadManagerInstance instanceof ModularLoadManager) {
- final LoadManager casted = new
ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
+ } else if (loadManagerInstance instanceof ModularLoadManager
modularLoadManager) {
+ final LoadManager casted = new
ModularLoadManagerWrapper(modularLoadManager);
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ExtensibleLoadManager) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9be8d4938e3..585d62c5b1f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
@@ -248,28 +249,27 @@ public class NamespaceService implements AutoCloseable {
/**
* Return the URL of the broker who's owning a particular service unit in
asynchronous way.
- *
+ * <p>
* If the service unit is not owned, return a CompletableFuture with empty
optional.
*/
public CompletableFuture<Optional<URL>>
getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) {
- if (suName instanceof TopicName) {
- TopicName name = (TopicName) suName;
+ if (suName instanceof TopicName name) {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting web service URL of topic: {} - options:
{}", name, options);
}
return getBundleAsync(name)
.thenCompose(namespaceBundle ->
- internalGetWebServiceUrl(Optional.of(name),
namespaceBundle, options));
+ internalGetWebServiceUrl(name, namespaceBundle,
options));
}
- if (suName instanceof NamespaceName) {
- return getFullBundleAsync((NamespaceName) suName)
+ if (suName instanceof NamespaceName namespaceName) {
+ return getFullBundleAsync(namespaceName)
.thenCompose(namespaceBundle ->
- internalGetWebServiceUrl(Optional.empty(),
namespaceBundle, options));
+ internalGetWebServiceUrl(null, namespaceBundle,
options));
}
- if (suName instanceof NamespaceBundle) {
- return internalGetWebServiceUrl(Optional.empty(),
(NamespaceBundle) suName, options);
+ if (suName instanceof NamespaceBundle namespaceBundle) {
+ return internalGetWebServiceUrl(null, namespaceBundle, options);
}
throw new IllegalArgumentException("Unrecognized class of
NamespaceBundle: " + suName.getClass().getName());
@@ -277,7 +277,7 @@ public class NamespaceService implements AutoCloseable {
/**
* Return the URL of the broker who's owning a particular service unit.
- *
+ * <p>
* If the service unit is not owned, return an empty optional
*/
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions
options) throws Exception {
@@ -285,7 +285,7 @@ public class NamespaceService implements AutoCloseable {
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
}
- private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(Optional<ServiceUnitId> topic,
+ private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
NamespaceBundle bundle,
LookupOptions options) {
@@ -306,7 +306,7 @@ public class NamespaceService implements AutoCloseable {
}
CompletableFuture<Optional<LookupResult>> future =
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
- ? loadManager.get().findBrokerServiceUrl(topic, bundle) :
+ ?
loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
findBrokerServiceUrl(bundle, options);
return future.thenApply(lookupResult -> {
@@ -329,7 +329,7 @@ public class NamespaceService implements AutoCloseable {
/**
* Register all the bootstrap name spaces including the heartbeat
namespace.
*
- * @throws PulsarServerException
+ * @throws PulsarServerException if an unexpected error occurs
*/
public void registerBootstrapNamespaces() throws PulsarServerException {
@@ -352,20 +352,19 @@ public class NamespaceService implements AutoCloseable {
}
/**
- * Tried to registers a namespace to this instance.
+ * Tries to register a namespace to this instance.
*
- * @param nsname
- * @param ensureOwned
- * @return
- * @throws PulsarServerException
- * @throws Exception
+ * @param nsname namespace name
+ * @param ensureOwned sets the behavior when the namespace is already
owned by another broker.
+ * If this flag is set to true, then the method will
throw an exception.
+ * If this flag is set to false, then the method will
return false.
+ * @return true if the namespace was successfully registered, false
otherwise
+ * @throws PulsarServerException if an error occurs when registering the
namespace
*/
public boolean registerNamespace(NamespaceName nsname, boolean
ensureOwned) throws PulsarServerException {
try {
- NamespaceBundle nsFullBundle = null;
-
// all pre-registered namespace is assumed to have bundles disabled
- nsFullBundle = bundleFactory.getFullBundle(nsname);
+ NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
final NamespaceEphemeralData otherData;
if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
@@ -417,10 +416,9 @@ public class NamespaceService implements AutoCloseable {
/**
* Main internal method to lookup and setup ownership of service unit to a
broker.
*
- * @param bundle
- * @param options
- * @return
- * @throws PulsarServerException
+ * @param bundle the namespace bundle
+ * @param options the lookup options
+ * @return the lookup result
*/
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
NamespaceBundle bundle, LookupOptions options) {
@@ -440,7 +438,7 @@ public class NamespaceService implements AutoCloseable {
// First check if we or someone else already owns the bundle
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
- if (!nsData.isPresent()) {
+ if (nsData.isEmpty()) {
// No one owns this bundle
if (options.isReadOnly()) {
@@ -448,9 +446,7 @@ public class NamespaceService implements AutoCloseable {
future.complete(Optional.empty());
} else {
// Now, no one owns the namespace yet. Hence, we will
try to dynamically assign it
- pulsar.getExecutor().execute(() -> {
- searchForCandidateBroker(bundle, future, options);
- });
+ pulsar.getExecutor().execute(() ->
searchForCandidateBroker(bundle, future, options));
}
} else if (nsData.get().isDisabled()) {
future.completeExceptionally(
@@ -474,7 +470,6 @@ public class NamespaceService implements AutoCloseable {
url == null ? null : url.toString(),
urlTls == null ? null :
urlTls.toString())));
}
- return;
} else {
future.complete(Optional.of(new
LookupResult(nsData.get())));
}
@@ -502,13 +497,13 @@ public class NamespaceService implements AutoCloseable {
new IllegalStateException("The leader election has not yet
been completed!"));
return;
}
- String candidateBroker = null;
+ String candidateBroker;
String candidateBrokerAdvertisedAddr = null;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
// The leader election service was not initialized yet. This can
happen because the broker service is
- // initialized first and it might start receiving lookup requests
before the leader election service is
+ // initialized first, and it might start receiving lookup requests
before the leader election service is
// fully initialized.
LOG.warn("Leader election service isn't initialized yet. "
+ "Returning empty result to lookup.
NamespaceBundle[{}]",
@@ -548,7 +543,7 @@ public class NamespaceService implements AutoCloseable {
&&
isBrokerActive(currentLeader.get().getServiceUrl());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
- if (!currentLeader.isPresent()) {
+ if (currentLeader.isEmpty()) {
LOG.warn(
"The information about the current
leader broker wasn't available. "
+ "Handling load manager
decisions in a decentralized way. "
@@ -565,7 +560,7 @@ public class NamespaceService implements AutoCloseable {
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<Pair<String, String>> availableBroker =
getLeastLoadedFromLoadManager(bundle);
- if (!availableBroker.isPresent()) {
+ if (availableBroker.isEmpty()) {
LOG.warn("Load manager didn't return any available
broker. "
+ "Returning empty result to
lookup. NamespaceBundle[{}]",
bundle);
@@ -603,7 +598,7 @@ public class NamespaceService implements AutoCloseable {
// Found owner for the namespace bundle
if (options.isLoadTopicsInBundle()) {
- // Schedule the task to pre-load topics
+ // Schedule the task to preload topics
pulsar.loadNamespaceTopics(bundle);
}
// find the target
@@ -614,7 +609,6 @@ public class NamespaceService implements AutoCloseable {
lookupFuture.completeExceptionally(
new PulsarServerException("the broker
do not have "
+
options.getAdvertisedListenerName() + " listener"));
- return;
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
@@ -622,11 +616,9 @@ public class NamespaceService implements AutoCloseable {
new LookupResult(ownerInfo,
url == null ? null :
url.toString(),
urlTls == null ? null :
urlTls.toString())));
- return;
}
} else {
lookupFuture.complete(Optional.of(new
LookupResult(ownerInfo)));
- return;
}
}
}).exceptionally(exception -> {
@@ -712,7 +704,7 @@ public class NamespaceService implements AutoCloseable {
} else {
LOG.warn("Broker {} ({}) couldn't be found in available brokers
{}",
candidateBroker, candidateBrokerHostAndPort,
-
availableBrokers.stream().collect(Collectors.joining(",")));
+ String.join(",", availableBrokers));
return false;
}
}
@@ -722,8 +714,7 @@ public class NamespaceService implements AutoCloseable {
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + candidateBroker + "'
isn't an URI.");
}
- String candidateBrokerHostAndPort =
candidateBroker.substring(uriSeparatorPos + 3);
- return candidateBrokerHostAndPort;
+ return candidateBroker.substring(uriSeparatorPos + 3);
}
private Set<String> getAvailableBrokers() {
@@ -737,12 +728,13 @@ public class NamespaceService implements AutoCloseable {
/**
* Helper function to encapsulate the logic to invoke between old and new
load manager.
*
- * @return
- * @throws Exception
+ * @param serviceUnit the service unit
+ * @return the least loaded broker addresses
+ * @throws Exception if an error occurs
*/
private Optional<Pair<String, String>>
getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
Optional<ResourceUnit> leastLoadedBroker =
loadManager.get().getLeastLoaded(serviceUnit);
- if (!leastLoadedBroker.isPresent()) {
+ if (leastLoadedBroker.isEmpty()) {
LOG.warn("No broker is available for {}", serviceUnit);
return Optional.empty();
}
@@ -863,7 +855,7 @@ public class NamespaceService implements AutoCloseable {
public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws
Exception {
try {
- // Does ZooKeeper says that the namespace is disabled?
+ // Does ZooKeeper say that the namespace is disabled?
CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture =
ownershipCache.getOwnerAsync(bundle);
if (nsDataFuture != null) {
Optional<NamespaceEphemeralData> nsData =
nsDataFuture.getNow(null);
@@ -886,12 +878,14 @@ public class NamespaceService implements AutoCloseable {
/**
* 1. split the given bundle into two bundles 2. assign ownership of both
the bundles to current broker 3. update
* policies with newly created bundles into LocalZK 4. disable original
bundle and refresh the cache.
- *
+ * <p>
* It will call splitAndOwnBundleOnceAndRetry to do the real retry work,
which will retry "retryTimes".
*
- * @param bundle
- * @return
- * @throws Exception
+ * @param bundle the bundle to split
+ * @param unload whether to unload the new split bundles
+ * @param splitAlgorithm the algorithm to split the bundle
+ * @param boundaries the boundaries to split the bundle
+ * @return a future that will complete when the bundle is split and owned
*/
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm,
@@ -926,36 +920,36 @@ public class NamespaceService implements AutoCloseable {
}
try {
bundleFactory.splitBundles(bundle, splitBoundaries.size()
+ 1, splitBoundaries)
- .thenAccept(splittedBundles -> {
+ .thenAccept(splitBundles -> {
// Split and updateNamespaceBundles. Update
may fail because of concurrent write to
// Zookeeper.
- if (splittedBundles == null) {
+ if (splitBundles == null) {
String msg = format("bundle %s not found
under namespace", bundle.toString());
LOG.warn(msg);
updateFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg));
return;
}
-
Objects.requireNonNull(splittedBundles.getLeft());
-
Objects.requireNonNull(splittedBundles.getRight());
-
checkArgument(splittedBundles.getRight().size() == splitBoundaries.size() + 1,
+ Objects.requireNonNull(splitBundles.getLeft());
+
Objects.requireNonNull(splitBundles.getRight());
+ checkArgument(splitBundles.getRight().size()
== splitBoundaries.size() + 1,
"bundle has to be split in " +
(splitBoundaries.size() + 1) + " bundles");
NamespaceName nsname =
bundle.getNamespaceObject();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] splitAndOwnBundleOnce: {},
counter: {}, bundles: {}",
nsname.toString(),
bundle.getBundleRange(), counter.get(),
- splittedBundles.getRight());
+ splitBundles.getRight());
}
try {
// take ownership of newly split bundles
- for (NamespaceBundle sBundle :
splittedBundles.getRight()) {
+ for (NamespaceBundle sBundle :
splitBundles.getRight()) {
Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
- updateNamespaceBundles(nsname,
splittedBundles.getLeft()).thenCompose(__ -> {
- return
updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft());
- }).thenRun(() -> {
-
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
-
updateFuture.complete(splittedBundles.getRight());
+ updateNamespaceBundles(nsname,
splitBundles.getLeft()).thenCompose(__ ->
+
updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft()))
+ .thenRun(() -> {
+
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+
updateFuture.complete(splitBundles.getRight());
}).exceptionally(ex1 -> {
String msg = format("failed to update
namespace policies [%s], "
+ "NamespaceBundle: %s
due to %s",
@@ -1023,7 +1017,7 @@ public class NamespaceService implements AutoCloseable {
.exceptionally(e -> {
String msg1 = format(
"failed to disable bundle %s under
namespace [%s] with error %s",
- bundle.getNamespaceObject().toString(),
bundle.toString(), ex.getMessage());
+ bundle.getNamespaceObject().toString(),
bundle, ex.getMessage());
LOG.warn(msg1, e);
completionFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg1));
return null;
@@ -1093,9 +1087,8 @@ public class NamespaceService implements AutoCloseable {
* Update new bundle-range to admin/policies/namespace.
* Update may fail because of concurrent write to Zookeeper.
*
- * @param nsname
- * @param nsBundles
- * @throws Exception
+ * @param nsname the namespace name
+ * @param nsBundles the new namespace bundles
*/
public CompletableFuture<Void>
updateNamespaceBundlesForPolicies(NamespaceName nsname,
NamespaceBundles nsBundles) {
@@ -1122,9 +1115,8 @@ public class NamespaceService implements AutoCloseable {
* Update new bundle-range to LocalZk (create a new node if not present).
* Update may fail because of concurrent write to Zookeeper.
*
- * @param nsname
- * @param nsBundles
- * @throws Exception
+ * @param nsname the namespace name
+ * @param nsBundles the new namespace bundles
*/
public CompletableFuture<Void> updateNamespaceBundles(NamespaceName
nsname, NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
@@ -1176,7 +1168,7 @@ public class NamespaceService implements AutoCloseable {
}
/**
- * @Deprecated This method is only used in test now.
+ * @deprecated This method is only used in test now.
*/
@Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
@@ -1197,7 +1189,7 @@ public class NamespaceService implements AutoCloseable {
}
return getBundleAsync(topicName).thenCompose(bundle -> {
Optional<CompletableFuture<OwnedBundle>> optionalFuture =
ownershipCache.getOwnedBundleAsync(bundle);
- if (!optionalFuture.isPresent()) {
+ if (optionalFuture.isEmpty()) {
return CompletableFuture.completedFuture(false);
}
return optionalFuture.get().thenApply(ob -> ob != null &&
ob.isActive());
@@ -1220,7 +1212,7 @@ public class NamespaceService implements AutoCloseable {
return getBundleAsync(topic)
.thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
}
- return getBundleAsync(topic).thenApply(bundle ->
ownershipCache.isNamespaceBundleOwned(bundle));
+ return
getBundleAsync(topic).thenApply(ownershipCache::isNamespaceBundleOwned);
}
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName)
{
@@ -1462,21 +1454,19 @@ public class NamespaceService implements AutoCloseable {
if (peerClusterData != null) {
return
getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
} else {
- // Non-persistent topics don't have managed ledgers so
we have to retrieve them from local
+ // Non-persistent topics don't have managed ledgers.
So we have to retrieve them from local
// cache.
List<String> topics = new ArrayList<>();
synchronized
(pulsar.getBrokerService().getMultiLayerTopicMap()) {
if
(pulsar.getBrokerService().getMultiLayerTopicMap()
.containsKey(namespaceName.toString())) {
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())
- .forEach((__, bundle) -> {
- bundle.forEach((topicName, topic)
-> {
- if (topic instanceof
NonPersistentTopic
- &&
((NonPersistentTopic) topic).isActive()) {
- topics.add(topicName);
- }
- });
- });
+ .forEach((__, bundle) ->
bundle.forEach((topicName, topic) -> {
+ if (topic instanceof
NonPersistentTopic
+ && ((NonPersistentTopic)
topic).isActive()) {
+ topics.add(topicName);
+ }
+ }));
}
}
@@ -1545,14 +1535,10 @@ public class NamespaceService implements AutoCloseable {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
return
extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
- .thenCompose(lookupData -> {
- if (lookupData.isPresent()) {
- return CompletableFuture.completedFuture(
-
Optional.of(lookupData.get().toNamespaceEphemeralData()));
- } else {
- return
CompletableFuture.completedFuture(Optional.empty());
- }
- });
+ .thenCompose(lookupData -> lookupData
+ .map(brokerLookupData ->
+
CompletableFuture.completedFuture(Optional.of(brokerLookupData.toNamespaceEphemeralData())))
+ .orElseGet(() ->
CompletableFuture.completedFuture(Optional.empty())));
}
return ownershipCache.getOwnerAsync(bundle);
}
@@ -1576,7 +1562,6 @@ public class NamespaceService implements AutoCloseable {
}
public void unloadSLANamespace() throws Exception {
- PulsarAdmin adminClient = null;
NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
LOG.info("Checking owner for SLA namespace {}", namespaceName);
@@ -1589,7 +1574,7 @@ public class NamespaceService implements AutoCloseable {
}
LOG.info("Trying to unload SLA namespace {}", namespaceName);
- adminClient = pulsar.getAdminClient();
+ PulsarAdmin adminClient = pulsar.getAdminClient();
adminClient.namespaces().unload(namespaceName.toString());
LOG.info("Namespace {} unloaded successfully", namespaceName);
}
@@ -1662,7 +1647,7 @@ public class NamespaceService implements AutoCloseable {
/**
* used for filtering bundles in special namespace.
- * @param namespace
+ * @param namespace the namespace name
* @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
*/
public static boolean filterNamespaceForShedding(String namespace) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bfb94aa7740..fa121b8eb4d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -309,7 +309,7 @@ public abstract class PulsarWebResource {
}
return
pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
.thenCompose(tenantInfoOptional -> {
- if (!tenantInfoOptional.isPresent()) {
+ if (tenantInfoOptional.isEmpty()) {
throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
}
TenantInfo tenantInfo = tenantInfoOptional.get();
@@ -380,9 +380,11 @@ public abstract class PulsarWebResource {
/**
* It validates that peer-clusters can't coexist in replication-clusters.
*
- * @clusterName: given cluster whose peer-clusters can't be present into
replication-cluster list
- * @replicationClusters: replication-cluster list
+ * @param clusterName given cluster whose peer-clusters can't be present
into replication-cluster list
+ * @param replicationClusters replication-cluster list
+ * @deprecated use {@link #validatePeerClusterConflictAsync(String, Set)}
instead
*/
+ @Deprecated
protected void validatePeerClusterConflict(String clusterName, Set<String>
replicationClusters) {
try {
ClusterData clusterData =
clusterResources().getCluster(clusterName).orElseThrow(
@@ -453,7 +455,7 @@ public abstract class PulsarWebResource {
protected CompletableFuture<Void> validateClusterForTenantAsync(String
tenant, String cluster) {
return
pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant)
.thenAccept(tenantInfo -> {
- if (!tenantInfo.isPresent()) {
+ if (tenantInfo.isEmpty()) {
throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
}
if
(!tenantInfo.get().getAllowedClusters().contains(cluster)) {
@@ -488,7 +490,7 @@ public abstract class PulsarWebResource {
* Check if the cluster exists and redirect the call to the owning cluster.
*
* @param cluster Cluster name
- * @throws Exception In case the redirect happens
+ * @throws WebApplicationException In case the redirect happens
*/
protected void validateClusterOwnership(String cluster) throws
WebApplicationException {
sync(()-> validateClusterOwnershipAsync(cluster));
@@ -550,11 +552,8 @@ public abstract class PulsarWebResource {
return true;
}
- if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
- // Without authorization, any cluster name should be valid and
accepted by the broker
- return true;
- }
- return false;
+ // Without authorization, any cluster name should be valid and
accepted by the broker
+ return !pulsarService.getConfiguration().isAuthorizationEnabled();
}
protected void validateBundleOwnership(String tenant, String cluster,
String namespace, boolean authoritative,
@@ -611,7 +610,7 @@ public abstract class PulsarWebResource {
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();
- return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(optionUrl -> optionUrl.isPresent());
+ return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(Optional::isPresent);
}
protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName
fqnn, BundlesData bundles,
@@ -664,7 +663,7 @@ public abstract class PulsarWebResource {
.loadTopicsInBundle(false).build();
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
// Ensure we get a url
- if (webUrl == null || !webUrl.isPresent()) {
+ if (webUrl.isEmpty()) {
log.warn("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED,
"Failed to find ownership for ServiceUnit:" +
bundle.toString());
@@ -697,8 +696,6 @@ public abstract class PulsarWebResource {
} catch (NullPointerException e) {
log.warn("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED, "Failed to
find ownership for ServiceUnit:" + bundle);
- } catch (WebApplicationException wae) {
- throw wae;
}
}
@@ -712,7 +709,7 @@ public abstract class PulsarWebResource {
.loadTopicsInBundle(false).build();
return nsService.getWebServiceUrlAsync(bundle, options)
.thenCompose(webUrl -> {
- if (webUrl == null || !webUrl.isPresent()) {
+ if (webUrl.isEmpty()) {
log.warn("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED,
"Failed to find ownership for ServiceUnit:" +
bundle.toString());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
index 16d87baedbe..afebbd276eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
@@ -62,7 +62,7 @@ public class RequestWrapper extends HttpServletRequestWrapper
{
}
- public int read() throws IOException {
+ public int read() {
return byteArrayInputStream.read();
}
};
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index efed6140395..3fa00beea1f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -76,24 +76,24 @@ public class ResponseHandlerFilter implements Filter {
if (request.isAsyncSupported() && request.isAsyncStarted()) {
request.getAsyncContext().addListener(new AsyncListener() {
@Override
- public void onComplete(AsyncEvent asyncEvent) throws
IOException {
+ public void onComplete(AsyncEvent asyncEvent) {
handleInterceptor(request, response);
}
@Override
- public void onTimeout(AsyncEvent asyncEvent) throws
IOException {
+ public void onTimeout(AsyncEvent asyncEvent) {
LOG.warn("Http request {} async context timeout.",
request);
handleInterceptor(request, response);
}
@Override
- public void onError(AsyncEvent asyncEvent) throws IOException {
+ public void onError(AsyncEvent asyncEvent) {
LOG.warn("Http request {} async context error.", request,
asyncEvent.getThrowable());
handleInterceptor(request, response);
}
@Override
- public void onStartAsync(AsyncEvent asyncEvent) throws
IOException {
+ public void onStartAsync(AsyncEvent asyncEvent) {
// nothing to do
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
index b18aa1c787a..c3ae3a495cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.common.policies.data.ErrorData;
/**
* Exception used to provide better error messages to clients of the REST API.
*/
-@SuppressWarnings("serial")
public class RestException extends WebApplicationException {
private Throwable cause = null;
static String getExceptionData(Throwable t) {
@@ -75,8 +74,7 @@ public class RestException extends WebApplicationException {
}
private static Response getResponse(Throwable t) {
- if (t instanceof WebApplicationException) {
- WebApplicationException e = (WebApplicationException) t;
+ if (t instanceof WebApplicationException e) {
return e.getResponse();
} else {
return Response
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 2d6a6af5847..eada0436f4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -264,9 +264,7 @@ public class WebService implements AutoCloseable {
context.setContextPath(path);
context.addServlet(servletHolder, MATCH_ALL);
if (attributeMap != null) {
- attributeMap.forEach((key, value) -> {
- context.setAttribute(key, value);
- });
+ attributeMap.forEach(context::setAttribute);
}
filterInitializer.addFilters(context, requiresAuthentication);
handlers.add(context);