This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 59931bdca4ebb8dc011e9eeb6c9bc4e6240bd34f
Author: Rudy Steiner <[email protected]>
AuthorDate: Wed May 19 23:51:10 2021 +0800

    Enable AutoTopicCreationType partitioned by proxy (#8048)
    
    
    (cherry picked from commit c24df3355f4312b2eb4a62f0f0497367fac1dadc)
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 104 ++++++++++-----------
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  28 +++++-
 2 files changed, 74 insertions(+), 58 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index f2d7242..82300a3 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -216,65 +216,56 @@ public class LookupProxyHandler {
         }
     }
 
+    /**
+     *   Always get partition metadata from broker service.
+     *
+     *
+     **/
     private void 
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata 
partitionMetadata,
             long clientRequestId) {
         TopicName topicName = TopicName.get(partitionMetadata.getTopic());
-        if (isBlank(brokerServiceURL)) {
-            
service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName,
-                    proxyConnection.clientAuthRole, 
proxyConnection.authenticationData).thenAccept(metadata -> {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Total number of partitions for 
topic {} is {}",
-                                    proxyConnection.clientAuthRole, topicName, 
metadata.partitions);
-                        }
-                        proxyConnection.ctx().writeAndFlush(
-                                
Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId));
-                    }).exceptionally(ex -> {
-                        log.warn("[{}] Failed to get partitioned metadata for 
topic {} {}", clientAddress, topicName,
-                                ex.getMessage(), ex);
-                        
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
-                          getServerError(ex), ex.getMessage(), 
clientRequestId));
-                        return null;
-                    });
-        } else {
-            URI brokerURI;
-            try {
-                brokerURI = new URI(brokerServiceURL);
-            } catch (URISyntaxException e) {
-                
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
-                        e.getMessage(), clientRequestId));
+        URI brokerURI;
+        try {
+            String availableBrokerServiceURL = 
getBrokerServiceUrl(clientRequestId);
+            if (availableBrokerServiceURL == null) {
+                log.warn("No available broker for {} to lookup partition 
metadata", topicName);
                 return;
             }
-            InetSocketAddress addr = new 
InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
-
-            if (log.isDebugEnabled()) {
-                log.debug("Getting connections to '{}' for Looking up topic 
'{}' with clientReq Id '{}'", addr,
-                        topicName.getPartitionedTopicName(), clientRequestId);
-            }
+            brokerURI = new URI(availableBrokerServiceURL);
+        } catch (URISyntaxException e) {
+            
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                    e.getMessage(), clientRequestId));
+            return;
+        }
+        InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), 
brokerURI.getPort());
 
-            
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> 
{
-                // Connected to backend broker
-                long requestId = proxyConnection.newRequestId();
-                ByteBuf command;
-                command = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
-                clientCnx.newLookup(command, requestId).whenComplete((r, t) -> 
{
-                    if (t != null) {
-                        log.warn("[{}] failed to get Partitioned metadata : 
{}", topicName.toString(),
-                            t.getMessage(), t);
-                        
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
-                            t.getMessage(), clientRequestId));
-                    } else {
-                        proxyConnection.ctx().writeAndFlush(
-                            
Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
-                    }
-                    
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
-                });
-            }).exceptionally(ex -> {
-                // Failed to connect to backend broker
-                
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
-                        ex.getMessage(), clientRequestId));
-                return null;
-            });
+        if (log.isDebugEnabled()) {
+            log.debug("Getting connections to '{}' for Looking up topic '{}' 
with clientReq Id '{}'", addr,
+                    topicName.getPartitionedTopicName(), clientRequestId);
         }
+        
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> 
{
+            // Connected to backend broker
+            long requestId = proxyConnection.newRequestId();
+            ByteBuf command;
+            command = 
Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
+            clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName.toString(),
+                        t.getMessage(), t);
+                    
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
+                        t.getMessage(), clientRequestId));
+                } else {
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newPartitionMetadataResponse(r.partitions, 
clientRequestId));
+                }
+                
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
+            });
+        }).exceptionally(ex -> {
+            // Failed to connect to backend broker
+            
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                    ex.getMessage(), clientRequestId));
+            return null;
+        });
     }
 
     public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace) {
@@ -302,7 +293,7 @@ public class LookupProxyHandler {
 
     private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace,
                                             long clientRequestId) {
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
             return;
@@ -364,7 +355,7 @@ public class LookupProxyHandler {
         }
 
         final long clientRequestId = commandGetSchema.getRequestId();
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
         String topic = commandGetSchema.getTopic();
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
@@ -411,7 +402,10 @@ public class LookupProxyHandler {
 
     }
 
-    private String getServiceUrl(long clientRequestId) {
+    /**
+     *  Get default broker service url or discovery an available broker
+     **/
+    private String getBrokerServiceUrl(long clientRequestId) {
         if (isBlank(brokerServiceURL)) {
             ServiceLookupData availableBroker;
             try {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 187d56a..8106ef3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -23,14 +23,13 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
-
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -188,6 +187,29 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
         }
     }
 
+    /**
+     * test auto create partitioned topic by proxy
+     **/
+    @Test
+    public void testAutoCreateTopic() throws Exception{
+        int defaultPartition=2;
+        int 
defaultNumPartitions=pulsar.getConfiguration().getDefaultNumPartitions();
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(defaultPartition);
+        try {
+            @Cleanup
+            PulsarClient client = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+              .build();
+            String topic = 
"persistent://sample/test/local/partitioned-proxy-topic";
+            CompletableFuture<List<String>> partitionNamesFuture = 
client.getPartitionsForTopic(topic);
+            List<String> partitionNames = partitionNamesFuture.get(30000, 
TimeUnit.MILLISECONDS);
+            Assert.assertEquals(partitionNames.size(), defaultPartition);
+        }finally {
+            
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+            
pulsar.getConfiguration().setDefaultNumPartitions(defaultNumPartitions);
+        }
+    }
+
     @Test
     public void testRegexSubscription() throws Exception {
         @Cleanup

Reply via email to