This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5b72eec Issue 2119: TopicPatternSubscription doesn't work through
proxy (#2176)
5b72eec is described below
commit 5b72eecb57b56e42f1159ac228de0bc17887a869
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 16 21:51:59 2018 -0700
Issue 2119: TopicPatternSubscription doesn't work through proxy (#2176)
* Fix regex
* Implement GetTopicsOfNamespace on Proxy
* remove unneeded import
---
.../pulsar/proxy/server/LookupProxyHandler.java | 104 +++++++++++++++++++++
.../pulsar/proxy/server/ProxyConnection.java | 8 ++
.../org/apache/pulsar/proxy/server/ProxyTest.java | 49 ++++++++++
3 files changed, 161 insertions(+)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index bed7ed7..957cff8 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import org.apache.pulsar.common.api.Commands;
+import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -54,6 +55,11 @@ public class LookupProxyHandler {
.build("pulsar_proxy_partitions_metadata_requests", "Counter of
partitions metadata requests").create()
.register();
+ private static final Counter getTopicsOfNamespaceRequestss = Counter
+ .build("pulsar_proxy_get_topics_of_namespace_requests", "Counter
of getTopicsOfNamespace requests")
+ .create()
+ .register();
+
static final Counter rejectedLookupRequests =
Counter.build("pulsar_proxy_rejected_lookup_requests",
"Counter of topic lookup requests rejected due to
throttling").create().register();
@@ -62,6 +68,11 @@ public class LookupProxyHandler {
"Counter of partitions metadata requests rejected due to
throttling")
.create().register();
+ static final Counter rejectedGetTopicsOfNamespaceRequests = Counter
+ .build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
+ "Counter of getTopicsOfNamespace requests rejected due to
throttling")
+ .create().register();
+
public LookupProxyHandler(ProxyService proxy, ProxyConnection
proxyConnection) {
this.service = proxy;
this.proxyConnection = proxyConnection;
@@ -246,5 +257,98 @@ public class LookupProxyHandler {
}
}
+ public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace) {
+ getTopicsOfNamespaceRequestss.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received GetTopicsOfNamespace", clientAddress);
+ }
+
+ final long requestId = commandGetTopicsOfNamespace.getRequestId();
+
+ if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+ handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
+ this.service.getLookupRequestSemaphore().release();
+ } else {
+ rejectedGetTopicsOfNamespaceRequests.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("GetTopicsOfNamespace Request ID {} from {} rejected
- {}.", requestId, clientAddress,
+ throttlingErrorMessage);
+ }
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ requestId, ServerError.ServiceNotReady, throttlingErrorMessage
+ ));
+ }
+ }
+
+
+ private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace,
+ long clientRequestId) {
+ String serviceUrl;
+ if (isBlank(brokerServiceURL)) {
+ ServiceLookupData availableBroker;
+ try {
+ availableBroker = service.getDiscoveryProvider().nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}",
clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ clientRequestId, ServerError.ServiceNotReady,
e.getMessage()
+ ));
+ return;
+ }
+ serviceUrl = this.connectWithTLS ?
+ availableBroker.getPulsarServiceUrlTls() :
availableBroker.getPulsarServiceUrl();
+ } else {
+ serviceUrl = this.connectWithTLS ?
+ service.getConfiguration().getBrokerServiceURLTLS() :
service.getConfiguration().getBrokerServiceURL();
+ }
+ performGetTopicsOfNamespace(clientRequestId,
commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10);
+ }
+
+ private void performGetTopicsOfNamespace(long clientRequestId,
+ String namespaceName,
+ String brokerServiceUrl,
+ int numberOfRetries) {
+ if (numberOfRetries == 0) {
+
proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId,
ServerError.ServiceNotReady,
+ "Reached max number of redirections"));
+ return;
+ }
+
+ URI brokerURI;
+ try {
+ brokerURI = new URI(brokerServiceUrl);
+ } catch (URISyntaxException e) {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.MetadataError, e.getMessage()));
+ return;
+ }
+
+ InetSocketAddress addr =
InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
+ if (log.isDebugEnabled()) {
+ log.debug("Getting connections to '{}' for getting
TopicsOfNamespace '{}' with clientReq Id '{}'",
+ addr, namespaceName, clientRequestId);
+ }
+
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx ->
{
+ // Connected to backend broker
+ long requestId = proxyConnection.newRequestId();
+ ByteBuf command;
+ command = Commands.newGetTopicsOfNamespaceRequest(namespaceName,
requestId);
+ clientCnx.newGetTopicsOfNamespace(command,
requestId).thenAccept(topicList ->
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newGetTopicsOfNamespaceResponse(topicList,
clientRequestId))
+ ).exceptionally(ex -> {
+ log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
clientAddress, namespaceName, ex.getMessage());
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.ServiceNotReady, ex.getMessage()));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ // Failed to connect to backend broker
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.ServiceNotReady, ex.getMessage()));
+ return null;
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(LookupProxyHandler.class);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 08b5b2b..a57cf88 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
+import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
@@ -237,6 +238,13 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
}
+ @Override
+ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace) {
+ checkArgument(state == State.ProxyLookupRequests);
+
+
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
+ }
+
/**
* handles discovery request from client ands sends next active broker
address
*/
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 9e856c1..391a5bf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
@@ -35,12 +37,16 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class ProxyTest extends MockedPulsarServiceBaseTest {
+ private static final Logger log = LoggerFactory.getLogger(ProxyTest.class);
+
private final String DUMMY_VALUE = "DUMMY_VALUE";
private ProxyService proxyService;
@@ -143,4 +149,47 @@ public class ProxyTest extends MockedPulsarServiceBaseTest
{
client.close();
}
+ @Test
+ public void testRegexSubscription() throws Exception {
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ .connectionsPerBroker(5).ioThreads(5).build();
+
+ // create two topics by subscribing to a topic and closing it
+ try (Consumer<byte[]> ignored = client.newConsumer()
+ .topic("persistent://sample/test/local/topic1")
+ .subscriptionName("ignored")
+ .subscribe()) {
+ }
+ try (Consumer<byte[]> ignored = client.newConsumer()
+ .topic("persistent://sample/test/local/topic2")
+ .subscriptionName("ignored")
+ .subscribe()) {
+ }
+
+ // make sure regex subscription
+ String regexSubscriptionPattern =
"persistent://sample/test/local/topic.*";
+ log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
+ try (Consumer<byte[]> consumer = client.newConsumer()
+ .topicsPattern(regexSubscriptionPattern)
+ .subscriptionName("regex-sub")
+ .subscribe()) {
+ log.info("Successfully subscribe to topics using regex {}",
regexSubscriptionPattern);
+
+ final int numMessages = 20;
+
+ try (Producer<byte[]> producer = client.newProducer()
+ .topic("persistent://sample/test/local/topic1")
+ .create()) {
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(("message-" + i).getBytes(UTF_8));
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ assertEquals("message-" + i, new String(msg.getValue(),
UTF_8));
+ }
+ }
+ }
+
}