This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 59931bdca4ebb8dc011e9eeb6c9bc4e6240bd34f Author: Rudy Steiner <[email protected]> AuthorDate: Wed May 19 23:51:10 2021 +0800 Enable AutoTopicCreationType partitioned by proxy (#8048) (cherry picked from commit c24df3355f4312b2eb4a62f0f0497367fac1dadc) --- .../pulsar/proxy/server/LookupProxyHandler.java | 104 ++++++++++----------- .../org/apache/pulsar/proxy/server/ProxyTest.java | 28 +++++- 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index f2d7242..82300a3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -216,65 +216,56 @@ public class LookupProxyHandler { } } + /** + * Always get partition metadata from broker service. + * + * + **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { TopicName topicName = TopicName.get(partitionMetadata.getTopic()); - if (isBlank(brokerServiceURL)) { - service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName, - proxyConnection.clientAuthRole, proxyConnection.authenticationData).thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Total number of partitions for topic {} is {}", - proxyConnection.clientAuthRole, topicName, metadata.partitions); - } - proxyConnection.ctx().writeAndFlush( - Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId)); - }).exceptionally(ex -> { - log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, topicName, - ex.getMessage(), ex); - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse( - getServerError(ex), ex.getMessage(), clientRequestId)); - return null; - }); - } else { - URI brokerURI; - try { - brokerURI = new URI(brokerServiceURL); - } catch (URISyntaxException e) { - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, - e.getMessage(), clientRequestId)); + URI brokerURI; + try { + String availableBrokerServiceURL = getBrokerServiceUrl(clientRequestId); + if (availableBrokerServiceURL == null) { + log.warn("No available broker for {} to lookup partition metadata", topicName); return; } - InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort()); - - if (log.isDebugEnabled()) { - log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, - topicName.getPartitionedTopicName(), clientRequestId); - } + brokerURI = new URI(availableBrokerServiceURL); + } catch (URISyntaxException e) { + proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, + e.getMessage(), clientRequestId)); + return; + } + InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort()); - proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { - // Connected to backend broker - long requestId = proxyConnection.newRequestId(); - ByteBuf command; - command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); - clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { - if (t != null) { - log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), - t.getMessage(), t); - proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t), - t.getMessage(), clientRequestId)); - } else { - proxyConnection.ctx().writeAndFlush( - Commands.newPartitionMetadataResponse(r.partitions, clientRequestId)); - } - proxyConnection.getConnectionPool().releaseConnection(clientCnx); - }); - }).exceptionally(ex -> { - // Failed to connect to backend broker - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - ex.getMessage(), clientRequestId)); - return null; - }); + if (log.isDebugEnabled()) { + log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, + topicName.getPartitionedTopicName(), clientRequestId); } + proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { + // Connected to backend broker + long requestId = proxyConnection.newRequestId(); + ByteBuf command; + command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); + clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { + if (t != null) { + log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), + t.getMessage(), t); + proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t), + t.getMessage(), clientRequestId)); + } else { + proxyConnection.ctx().writeAndFlush( + Commands.newPartitionMetadataResponse(r.partitions, clientRequestId)); + } + proxyConnection.getConnectionPool().releaseConnection(clientCnx); + }); + }).exceptionally(ex -> { + // Failed to connect to backend broker + proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, + ex.getMessage(), clientRequestId)); + return null; + }); } public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { @@ -302,7 +293,7 @@ public class LookupProxyHandler { private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long clientRequestId) { - String serviceUrl = getServiceUrl(clientRequestId); + String serviceUrl = getBrokerServiceUrl(clientRequestId); if(!StringUtils.isNotBlank(serviceUrl)) { return; @@ -364,7 +355,7 @@ public class LookupProxyHandler { } final long clientRequestId = commandGetSchema.getRequestId(); - String serviceUrl = getServiceUrl(clientRequestId); + String serviceUrl = getBrokerServiceUrl(clientRequestId); String topic = commandGetSchema.getTopic(); if(!StringUtils.isNotBlank(serviceUrl)) { @@ -411,7 +402,10 @@ public class LookupProxyHandler { } - private String getServiceUrl(long clientRequestId) { + /** + * Get default broker service url or discovery an available broker + **/ + private String getBrokerServiceUrl(long clientRequestId) { if (isBlank(brokerServiceURL)) { ServiceLookupData availableBroker; try { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 187d56a..8106ef3 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -23,14 +23,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; - import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; - +import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import lombok.Data; import lombok.EqualsAndHashCode; @@ -188,6 +187,29 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { } } + /** + * test auto create partitioned topic by proxy + **/ + @Test + public void testAutoCreateTopic() throws Exception{ + int defaultPartition=2; + int defaultNumPartitions=pulsar.getConfiguration().getDefaultNumPartitions(); + pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned"); + pulsar.getConfiguration().setDefaultNumPartitions(defaultPartition); + try { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) + .build(); + String topic = "persistent://sample/test/local/partitioned-proxy-topic"; + CompletableFuture<List<String>> partitionNamesFuture = client.getPartitionsForTopic(topic); + List<String> partitionNames = partitionNamesFuture.get(30000, TimeUnit.MILLISECONDS); + Assert.assertEquals(partitionNames.size(), defaultPartition); + }finally { + pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned"); + pulsar.getConfiguration().setDefaultNumPartitions(defaultNumPartitions); + } + } + @Test public void testRegexSubscription() throws Exception { @Cleanup
