This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d3a2ee444ecb54076b15dd41d69701658b435a4e Author: Lari Hotari <[email protected]> AuthorDate: Wed Jan 19 06:41:12 2022 +0200 [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (#13812) * [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits - should release permit in try-finally block * Cleanup code in LookupProxyHandler (cherry picked from commit 85b62e050b01b591a4b5751aab48b418ac9e4e76) --- .../pulsar/proxy/server/LookupProxyHandler.java | 91 ++++++++++------------ 1 file changed, 40 insertions(+), 51 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index b14bea5..30e11a1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -18,14 +18,14 @@ */ package org.apache.pulsar.proxy.server; -import static org.apache.commons.lang3.StringUtils.isBlank; - +import io.netty.buffer.ByteBuf; +import io.prometheus.client.Counter; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.Optional; - +import java.util.concurrent.Semaphore; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.protocol.Commands; @@ -42,13 +42,10 @@ import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.prometheus.client.Counter; - public class LookupProxyHandler { private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests"; - private final ProxyService service; private final ProxyConnection proxyConnection; + private final BrokerDiscoveryProvider discoveryProvider; private final boolean connectWithTLS; private SocketAddress clientAddress; @@ -83,9 +80,11 @@ public class LookupProxyHandler { .build("pulsar_proxy_rejected_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests rejected due to throttling") .create().register(); + private final Semaphore lookupRequestSemaphore; public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) { - this.service = proxy; + this.discoveryProvider = proxy.getDiscoveryProvider(); + this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore(); this.proxyConnection = proxyConnection; this.clientAddress = proxyConnection.clientAddress(); this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker(); @@ -98,28 +97,16 @@ public class LookupProxyHandler { log.debug("Received Lookup from {}", clientAddress); } long clientRequestId = lookup.getRequestId(); - if (this.service.getLookupRequestSemaphore().tryAcquire()) { - lookupRequests.inc(); - String topic = lookup.getTopic(); - String serviceUrl; - if (isBlank(brokerServiceURL)) { - ServiceLookupData availableBroker = null; - try { - availableBroker = service.getDiscoveryProvider().nextBroker(); - } catch (Exception e) { - log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); - proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, - e.getMessage(), clientRequestId)); - return; + if (lookupRequestSemaphore.tryAcquire()) { + try { + lookupRequests.inc(); + String serviceUrl = getBrokerServiceUrl(clientRequestId); + if (serviceUrl != null) { + performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10); } - serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() - : availableBroker.getPulsarServiceUrl(); - } else { - serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS() - : service.getConfiguration().getBrokerServiceURL(); + } finally { + lookupRequestSemaphore.release(); } - performLookup(clientRequestId, topic, serviceUrl, false, 10); - this.service.getLookupRequestSemaphore().release(); } else { rejectedLookupRequests.inc(); if (log.isDebugEnabled()) { @@ -203,9 +190,12 @@ public class LookupProxyHandler { log.debug("[{}] Received PartitionMetadataLookup", clientAddress); } final long clientRequestId = partitionMetadata.getRequestId(); - if (this.service.getLookupRequestSemaphore().tryAcquire()) { - handlePartitionMetadataResponse(partitionMetadata, clientRequestId); - this.service.getLookupRequestSemaphore().release(); + if (lookupRequestSemaphore.tryAcquire()) { + try { + handlePartitionMetadataResponse(partitionMetadata, clientRequestId); + } finally { + lookupRequestSemaphore.release(); + } } else { rejectedPartitionsMetadataRequests.inc(); if (log.isDebugEnabled()) { @@ -277,9 +267,12 @@ public class LookupProxyHandler { final long requestId = commandGetTopicsOfNamespace.getRequestId(); - if (this.service.getLookupRequestSemaphore().tryAcquire()) { - handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId); - this.service.getLookupRequestSemaphore().release(); + if (lookupRequestSemaphore.tryAcquire()) { + try { + handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId); + } finally { + lookupRequestSemaphore.release(); + } } else { rejectedGetTopicsOfNamespaceRequests.inc(); if (log.isDebugEnabled()) { @@ -408,24 +401,20 @@ public class LookupProxyHandler { * Get default broker service url or discovery an available broker **/ private String getBrokerServiceUrl(long clientRequestId) { - if (isBlank(brokerServiceURL)) { - ServiceLookupData availableBroker; - try { - availableBroker = service.getDiscoveryProvider().nextBroker(); - } catch (Exception e) { - log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); - proxyConnection.ctx().writeAndFlush(Commands.newError( - clientRequestId, ServerError.ServiceNotReady, e.getMessage() - )); - return null; - } - return this.connectWithTLS ? - availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); - } else { - return this.connectWithTLS ? - service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); + if (StringUtils.isNotBlank(brokerServiceURL)) { + return brokerServiceURL; } - + ServiceLookupData availableBroker; + try { + availableBroker = discoveryProvider.nextBroker(); + } catch (Exception e) { + log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); + proxyConnection.ctx().writeAndFlush(Commands.newError( + clientRequestId, ServerError.ServiceNotReady, e.getMessage() + )); + return null; + } + return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); } private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
