This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b72eec  Issue 2119: TopicPatternSubscription doesn't work through 
proxy (#2176)
5b72eec is described below

commit 5b72eecb57b56e42f1159ac228de0bc17887a869
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 16 21:51:59 2018 -0700

    Issue 2119: TopicPatternSubscription doesn't work through proxy (#2176)
    
    * Fix regex
    
    * Implement GetTopicsOfNamespace on Proxy
    
    * remove unneeded import
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 104 +++++++++++++++++++++
 .../pulsar/proxy/server/ProxyConnection.java       |   8 ++
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  49 ++++++++++
 3 files changed, 161 insertions(+)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index bed7ed7..957cff8 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 
 import org.apache.pulsar.common.api.Commands;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -54,6 +55,11 @@ public class LookupProxyHandler {
             .build("pulsar_proxy_partitions_metadata_requests", "Counter of 
partitions metadata requests").create()
             .register();
 
+    private static final Counter getTopicsOfNamespaceRequestss = Counter
+            .build("pulsar_proxy_get_topics_of_namespace_requests", "Counter 
of getTopicsOfNamespace requests")
+            .create()
+            .register();
+
     static final Counter rejectedLookupRequests = 
Counter.build("pulsar_proxy_rejected_lookup_requests",
             "Counter of topic lookup requests rejected due to 
throttling").create().register();
 
@@ -62,6 +68,11 @@ public class LookupProxyHandler {
                     "Counter of partitions metadata requests rejected due to 
throttling")
             .create().register();
 
+    static final Counter rejectedGetTopicsOfNamespaceRequests = Counter
+            .build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
+                    "Counter of getTopicsOfNamespace requests rejected due to 
throttling")
+            .create().register();
+
     public LookupProxyHandler(ProxyService proxy, ProxyConnection 
proxyConnection) {
         this.service = proxy;
         this.proxyConnection = proxyConnection;
@@ -246,5 +257,98 @@ public class LookupProxyHandler {
         }
     }
 
+    public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace) {
+        getTopicsOfNamespaceRequestss.inc();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Received GetTopicsOfNamespace", clientAddress);
+        }
+
+        final long requestId = commandGetTopicsOfNamespace.getRequestId();
+
+        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+            handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
+            this.service.getLookupRequestSemaphore().release();
+        } else {
+            rejectedGetTopicsOfNamespaceRequests.inc();
+            if (log.isDebugEnabled()) {
+                log.debug("GetTopicsOfNamespace Request ID {} from {} rejected 
- {}.", requestId, clientAddress,
+                    throttlingErrorMessage);
+            }
+            proxyConnection.ctx().writeAndFlush(Commands.newError(
+                requestId, ServerError.ServiceNotReady, throttlingErrorMessage
+            ));
+        }
+    }
+
+
+    private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace,
+                                            long clientRequestId) {
+        String serviceUrl;
+        if (isBlank(brokerServiceURL)) {
+            ServiceLookupData availableBroker;
+            try {
+                availableBroker = service.getDiscoveryProvider().nextBroker();
+            } catch (Exception e) {
+                log.warn("[{}] Failed to get next active broker {}", 
clientAddress, e.getMessage(), e);
+                proxyConnection.ctx().writeAndFlush(Commands.newError(
+                    clientRequestId, ServerError.ServiceNotReady, 
e.getMessage()
+                ));
+                return;
+            }
+            serviceUrl = this.connectWithTLS ?
+                availableBroker.getPulsarServiceUrlTls() : 
availableBroker.getPulsarServiceUrl();
+        } else {
+            serviceUrl = this.connectWithTLS ?
+                service.getConfiguration().getBrokerServiceURLTLS() : 
service.getConfiguration().getBrokerServiceURL();
+        }
+        performGetTopicsOfNamespace(clientRequestId, 
commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10);
+    }
+
+    private void performGetTopicsOfNamespace(long clientRequestId,
+                                             String namespaceName,
+                                             String brokerServiceUrl,
+                                             int numberOfRetries) {
+        if (numberOfRetries == 0) {
+            
proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId, 
ServerError.ServiceNotReady,
+                    "Reached max number of redirections"));
+            return;
+        }
+
+        URI brokerURI;
+        try {
+            brokerURI = new URI(brokerServiceUrl);
+        } catch (URISyntaxException e) {
+            proxyConnection.ctx().writeAndFlush(
+                    Commands.newError(clientRequestId, 
ServerError.MetadataError, e.getMessage()));
+            return;
+        }
+
+        InetSocketAddress addr = 
InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
+        if (log.isDebugEnabled()) {
+            log.debug("Getting connections to '{}' for getting 
TopicsOfNamespace '{}' with clientReq Id '{}'",
+                addr, namespaceName, clientRequestId);
+        }
+        
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> 
{
+            // Connected to backend broker
+            long requestId = proxyConnection.newRequestId();
+            ByteBuf command;
+            command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, 
requestId);
+            clientCnx.newGetTopicsOfNamespace(command, 
requestId).thenAccept(topicList ->
+                proxyConnection.ctx().writeAndFlush(
+                    Commands.newGetTopicsOfNamespaceResponse(topicList, 
clientRequestId))
+            ).exceptionally(ex -> {
+                log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", 
clientAddress, namespaceName, ex.getMessage());
+                proxyConnection.ctx().writeAndFlush(
+                        Commands.newError(clientRequestId, 
ServerError.ServiceNotReady, ex.getMessage()));
+                return null;
+            });
+        }).exceptionally(ex -> {
+            // Failed to connect to backend broker
+            proxyConnection.ctx().writeAndFlush(
+                    Commands.newError(clientRequestId, 
ServerError.ServiceNotReady, ex.getMessage()));
+            return null;
+        });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(LookupProxyHandler.class);
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 08b5b2b..a57cf88 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
@@ -237,6 +238,13 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
     }
 
+    @Override
+    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace 
commandGetTopicsOfNamespace) {
+        checkArgument(state == State.ProxyLookupRequests);
+
+        
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
+    }
+
     /**
      * handles discovery request from client ands sends next active broker 
address
      */
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 9e856c1..391a5bf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
 
 import java.util.concurrent.TimeUnit;
 
@@ -35,12 +37,16 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class ProxyTest extends MockedPulsarServiceBaseTest {
 
+    private static final Logger log = LoggerFactory.getLogger(ProxyTest.class);
+
     private final String DUMMY_VALUE = "DUMMY_VALUE";
 
     private ProxyService proxyService;
@@ -143,4 +149,47 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
         client.close();
     }
 
+    @Test
+    public void testRegexSubscription() throws Exception {
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
+            .connectionsPerBroker(5).ioThreads(5).build();
+
+        // create two topics by subscribing to a topic and closing it
+        try (Consumer<byte[]> ignored = client.newConsumer()
+            .topic("persistent://sample/test/local/topic1")
+            .subscriptionName("ignored")
+            .subscribe()) {
+        }
+        try (Consumer<byte[]> ignored = client.newConsumer()
+            .topic("persistent://sample/test/local/topic2")
+            .subscriptionName("ignored")
+            .subscribe()) {
+        }
+
+        // make sure regex subscription
+        String regexSubscriptionPattern = 
"persistent://sample/test/local/topic.*";
+        log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
+        try (Consumer<byte[]> consumer = client.newConsumer()
+            .topicsPattern(regexSubscriptionPattern)
+            .subscriptionName("regex-sub")
+            .subscribe()) {
+            log.info("Successfully subscribe to topics using regex {}", 
regexSubscriptionPattern);
+
+            final int numMessages = 20;
+
+            try (Producer<byte[]> producer = client.newProducer()
+                .topic("persistent://sample/test/local/topic1")
+                .create()) {
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send(("message-" + i).getBytes(UTF_8));
+                }
+            }
+
+            for (int i = 0; i < numMessages; i++) {
+                Message<byte[]> msg = consumer.receive();
+                assertEquals("message-" + i, new String(msg.getValue(), 
UTF_8));
+            }
+        }
+    }
+
 }

Reply via email to