This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dbe0518554bfb4306adf23335d62eb8959b83b27 Author: Lari Hotari <[email protected]> AuthorDate: Wed Jan 19 06:41:12 2022 +0200 [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (#13812) * [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits - should release permit in try-finally block * Cleanup code in LookupProxyHandler (cherry picked from commit 85b62e050b01b591a4b5751aab48b418ac9e4e76) (cherry picked from commit dcc07e8ebacec86a3779b289e235dd7731aa208e) --- .../pulsar/proxy/server/LookupProxyHandler.java | 96 ++++++++++------------ 1 file changed, 45 insertions(+), 51 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index a8525e4da98..8c18d456cbb 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -18,14 +18,13 @@ */ package org.apache.pulsar.proxy.server; -import static org.apache.commons.lang3.StringUtils.isBlank; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.Optional; - +import java.util.concurrent.Semaphore; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.protocol.Commands; @@ -47,8 +46,8 @@ import io.prometheus.client.Counter; public class LookupProxyHandler { private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests"; - private final ProxyService service; private final ProxyConnection proxyConnection; + private final BrokerDiscoveryProvider discoveryProvider; private final boolean connectWithTLS; private SocketAddress clientAddress; @@ -83,9 +82,11 @@ public class LookupProxyHandler { .build("pulsar_proxy_rejected_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests rejected due to throttling") .create().register(); + private final Semaphore lookupRequestSemaphore; public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) { - this.service = proxy; + this.discoveryProvider = proxy.getDiscoveryProvider(); + this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore(); this.proxyConnection = proxyConnection; this.clientAddress = proxyConnection.clientAddress(); this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker(); @@ -98,28 +99,16 @@ public class LookupProxyHandler { log.debug("Received Lookup from {}", clientAddress); } long clientRequestId = lookup.getRequestId(); - if (this.service.getLookupRequestSemaphore().tryAcquire()) { - lookupRequests.inc(); - String topic = lookup.getTopic(); - String serviceUrl; - if (isBlank(brokerServiceURL)) { - ServiceLookupData availableBroker = null; - try { - availableBroker = service.getDiscoveryProvider().nextBroker(); - } catch (Exception e) { - log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); - proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, - e.getMessage(), clientRequestId)); - return; + if (lookupRequestSemaphore.tryAcquire()) { + try { + lookupRequests.inc(); + String serviceUrl = getBrokerServiceUrl(clientRequestId); + if (serviceUrl != null) { + performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10); } - serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() - : availableBroker.getPulsarServiceUrl(); - } else { - serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS() - : service.getConfiguration().getBrokerServiceURL(); + } finally { + lookupRequestSemaphore.release(); } - performLookup(clientRequestId, topic, serviceUrl, false, 10); - this.service.getLookupRequestSemaphore().release(); } else { rejectedLookupRequests.inc(); if (log.isDebugEnabled()) { @@ -203,9 +192,12 @@ public class LookupProxyHandler { log.debug("[{}] Received PartitionMetadataLookup", clientAddress); } final long clientRequestId = partitionMetadata.getRequestId(); - if (this.service.getLookupRequestSemaphore().tryAcquire()) { - handlePartitionMetadataResponse(partitionMetadata, clientRequestId); - this.service.getLookupRequestSemaphore().release(); + if (lookupRequestSemaphore.tryAcquire()) { + try { + handlePartitionMetadataResponse(partitionMetadata, clientRequestId); + } finally { + lookupRequestSemaphore.release(); + } } else { rejectedPartitionsMetadataRequests.inc(); if (log.isDebugEnabled()) { @@ -226,7 +218,7 @@ public class LookupProxyHandler { long clientRequestId) { TopicName topicName = TopicName.get(partitionMetadata.getTopic()); - String serviceUrl = getServiceUrl(clientRequestId); + String serviceUrl = getBrokerServiceUrl(clientRequestId); if (serviceUrl == null) { log.warn("No available broker for {} to lookup partition metadata", topicName); return; @@ -273,9 +265,12 @@ public class LookupProxyHandler { final long requestId = commandGetTopicsOfNamespace.getRequestId(); - if (this.service.getLookupRequestSemaphore().tryAcquire()) { - handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId); - this.service.getLookupRequestSemaphore().release(); + if (lookupRequestSemaphore.tryAcquire()) { + try { + handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId); + } finally { + lookupRequestSemaphore.release(); + } } else { rejectedGetTopicsOfNamespaceRequests.inc(); if (log.isDebugEnabled()) { @@ -290,7 +285,7 @@ public class LookupProxyHandler { private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long clientRequestId) { - String serviceUrl = getServiceUrl(clientRequestId); + String serviceUrl = getBrokerServiceUrl(clientRequestId); if(!StringUtils.isNotBlank(serviceUrl)) { return; @@ -352,7 +347,7 @@ public class LookupProxyHandler { } final long clientRequestId = commandGetSchema.getRequestId(); - String serviceUrl = getServiceUrl(clientRequestId); + String serviceUrl = getBrokerServiceUrl(clientRequestId); String topic = commandGetSchema.getTopic(); Optional<SchemaVersion> schemaVersion; if (commandGetSchema.hasSchemaVersion()) { @@ -402,25 +397,24 @@ public class LookupProxyHandler { } - private String getServiceUrl(long clientRequestId) { - if (isBlank(brokerServiceURL)) { - ServiceLookupData availableBroker; - try { - availableBroker = service.getDiscoveryProvider().nextBroker(); - } catch (Exception e) { - log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); - proxyConnection.ctx().writeAndFlush(Commands.newError( - clientRequestId, ServerError.ServiceNotReady, e.getMessage() - )); - return null; - } - return this.connectWithTLS ? - availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); - } else { - return this.connectWithTLS ? - service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); + /** + * Get default broker service url or discovery an available broker. + **/ + private String getBrokerServiceUrl(long clientRequestId) { + if (StringUtils.isNotBlank(brokerServiceURL)) { + return brokerServiceURL; } - + ServiceLookupData availableBroker; + try { + availableBroker = discoveryProvider.nextBroker(); + } catch (Exception e) { + log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e); + proxyConnection.ctx().writeAndFlush(Commands.newError( + clientRequestId, ServerError.ServiceNotReady, e.getMessage() + )); + return null; + } + return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(); } private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
