This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 462d44f906fa4dd873c3e4767fcdce84bea20f48 Author: Lari Hotari <[email protected]> AuthorDate: Wed May 4 08:45:00 2022 +0300 [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler (#15415) * [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler * Use existing code pattern for creating address (cherry picked from commit 7373a51690d728475d47846bfbcca4fa64f2e228) (cherry picked from commit 5980cdc109736ed7e0df7df0e031dcde73e99536) --- .../pulsar/proxy/server/LookupProxyHandler.java | 45 ++++++++++------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 82300a3ae7f..a8525e4da98 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -37,6 +37,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetad import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,20 +225,16 @@ public class LookupProxyHandler { private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { TopicName topicName = TopicName.get(partitionMetadata.getTopic()); - URI brokerURI; - try { - String availableBrokerServiceURL = getBrokerServiceUrl(clientRequestId); - if (availableBrokerServiceURL == null) { - log.warn("No available broker for {} to lookup partition metadata", topicName); - return; - } - brokerURI = new URI(availableBrokerServiceURL); - } catch (URISyntaxException e) { - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, - e.getMessage(), clientRequestId)); + + String serviceUrl = getServiceUrl(clientRequestId); + if (serviceUrl == null) { + log.warn("No available broker for {} to lookup partition metadata", topicName); + return; + } + InetSocketAddress addr = getAddr(serviceUrl, clientRequestId); + if (addr == null) { return; } - InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort()); if (log.isDebugEnabled()) { log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, @@ -293,7 +290,7 @@ public class LookupProxyHandler { private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long clientRequestId) { - String serviceUrl = getBrokerServiceUrl(clientRequestId); + String serviceUrl = getServiceUrl(clientRequestId); if(!StringUtils.isNotBlank(serviceUrl)) { return; @@ -355,8 +352,14 @@ public class LookupProxyHandler { } final long clientRequestId = commandGetSchema.getRequestId(); - String serviceUrl = getBrokerServiceUrl(clientRequestId); + String serviceUrl = getServiceUrl(clientRequestId); String topic = commandGetSchema.getTopic(); + Optional<SchemaVersion> schemaVersion; + if (commandGetSchema.hasSchemaVersion()) { + schemaVersion = Optional.of(commandGetSchema.getSchemaVersion().toByteArray()).map(BytesSchemaVersion::of); + } else { + schemaVersion = Optional.empty(); + } if(!StringUtils.isNotBlank(serviceUrl)) { return; @@ -375,12 +378,9 @@ public class LookupProxyHandler { // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; - byte[] schemaVersion = null; - if (commandGetSchema.hasSchemaVersion()) { - schemaVersion = commandGetSchema.getSchemaVersion().toByteArray(); - } - command = Commands.newGetSchema(requestId, topic, - Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of)); + + command = Commands.newGetSchema(requestId, topic, schemaVersion); + clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> { if (t != null) { log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, t); @@ -402,10 +402,7 @@ public class LookupProxyHandler { } - /** - * Get default broker service url or discovery an available broker - **/ - private String getBrokerServiceUrl(long clientRequestId) { + private String getServiceUrl(long clientRequestId) { if (isBlank(brokerServiceURL)) { ServiceLookupData availableBroker; try {
