merlimat closed pull request #2176: Issue 2119: TopicPatternSubscription
doesn't work through proxy
URL: https://github.com/apache/incubator-pulsar/pull/2176
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index bed7ed7a84..957cff8d93 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -26,6 +26,7 @@
import java.net.URISyntaxException;
import org.apache.pulsar.common.api.Commands;
+import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
@@ -54,6 +55,11 @@
.build("pulsar_proxy_partitions_metadata_requests", "Counter of
partitions metadata requests").create()
.register();
+ private static final Counter getTopicsOfNamespaceRequestss = Counter
+ .build("pulsar_proxy_get_topics_of_namespace_requests", "Counter
of getTopicsOfNamespace requests")
+ .create()
+ .register();
+
static final Counter rejectedLookupRequests =
Counter.build("pulsar_proxy_rejected_lookup_requests",
"Counter of topic lookup requests rejected due to
throttling").create().register();
@@ -62,6 +68,11 @@
"Counter of partitions metadata requests rejected due to
throttling")
.create().register();
+ static final Counter rejectedGetTopicsOfNamespaceRequests = Counter
+ .build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
+ "Counter of getTopicsOfNamespace requests rejected due to
throttling")
+ .create().register();
+
public LookupProxyHandler(ProxyService proxy, ProxyConnection
proxyConnection) {
this.service = proxy;
this.proxyConnection = proxyConnection;
@@ -246,5 +257,98 @@ private void
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
}
}
+ public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace) {
+ getTopicsOfNamespaceRequestss.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received GetTopicsOfNamespace", clientAddress);
+ }
+
+ final long requestId = commandGetTopicsOfNamespace.getRequestId();
+
+ if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+ handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
+ this.service.getLookupRequestSemaphore().release();
+ } else {
+ rejectedGetTopicsOfNamespaceRequests.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("GetTopicsOfNamespace Request ID {} from {} rejected
- {}.", requestId, clientAddress,
+ throttlingErrorMessage);
+ }
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ requestId, ServerError.ServiceNotReady, throttlingErrorMessage
+ ));
+ }
+ }
+
+
+ private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace,
+ long clientRequestId) {
+ String serviceUrl;
+ if (isBlank(brokerServiceURL)) {
+ ServiceLookupData availableBroker;
+ try {
+ availableBroker = service.getDiscoveryProvider().nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}",
clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ clientRequestId, ServerError.ServiceNotReady,
e.getMessage()
+ ));
+ return;
+ }
+ serviceUrl = this.connectWithTLS ?
+ availableBroker.getPulsarServiceUrlTls() :
availableBroker.getPulsarServiceUrl();
+ } else {
+ serviceUrl = this.connectWithTLS ?
+ service.getConfiguration().getBrokerServiceURLTLS() :
service.getConfiguration().getBrokerServiceURL();
+ }
+ performGetTopicsOfNamespace(clientRequestId,
commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10);
+ }
+
+ private void performGetTopicsOfNamespace(long clientRequestId,
+ String namespaceName,
+ String brokerServiceUrl,
+ int numberOfRetries) {
+ if (numberOfRetries == 0) {
+
proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId,
ServerError.ServiceNotReady,
+ "Reached max number of redirections"));
+ return;
+ }
+
+ URI brokerURI;
+ try {
+ brokerURI = new URI(brokerServiceUrl);
+ } catch (URISyntaxException e) {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.MetadataError, e.getMessage()));
+ return;
+ }
+
+ InetSocketAddress addr =
InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
+ if (log.isDebugEnabled()) {
+ log.debug("Getting connections to '{}' for getting
TopicsOfNamespace '{}' with clientReq Id '{}'",
+ addr, namespaceName, clientRequestId);
+ }
+
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx ->
{
+ // Connected to backend broker
+ long requestId = proxyConnection.newRequestId();
+ ByteBuf command;
+ command = Commands.newGetTopicsOfNamespaceRequest(namespaceName,
requestId);
+ clientCnx.newGetTopicsOfNamespace(command,
requestId).thenAccept(topicList ->
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newGetTopicsOfNamespaceResponse(topicList,
clientRequestId))
+ ).exceptionally(ex -> {
+ log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
clientAddress, namespaceName, ex.getMessage());
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.ServiceNotReady, ex.getMessage()));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ // Failed to connect to backend broker
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId,
ServerError.ServiceNotReady, ex.getMessage()));
+ return null;
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(LookupProxyHandler.class);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 08b5b2b8ea..a57cf88308 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -39,6 +39,7 @@
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
+import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
@@ -237,6 +238,13 @@ protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
lookupProxyHandler.handlePartitionMetadataResponse(partitionMetadata);
}
+ @Override
+ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace
commandGetTopicsOfNamespace) {
+ checkArgument(state == State.ProxyLookupRequests);
+
+
lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
+ }
+
/**
* handles discovery request from client ands sends next active broker
address
*/
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 9e856c1870..391a5bfb43 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -20,7 +20,9 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
@@ -35,12 +37,16 @@
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class ProxyTest extends MockedPulsarServiceBaseTest {
+ private static final Logger log = LoggerFactory.getLogger(ProxyTest.class);
+
private final String DUMMY_VALUE = "DUMMY_VALUE";
private ProxyService proxyService;
@@ -143,4 +149,47 @@ public void testPartitions() throws Exception {
client.close();
}
+ @Test
+ public void testRegexSubscription() throws Exception {
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ .connectionsPerBroker(5).ioThreads(5).build();
+
+ // create two topics by subscribing to a topic and closing it
+ try (Consumer<byte[]> ignored = client.newConsumer()
+ .topic("persistent://sample/test/local/topic1")
+ .subscriptionName("ignored")
+ .subscribe()) {
+ }
+ try (Consumer<byte[]> ignored = client.newConsumer()
+ .topic("persistent://sample/test/local/topic2")
+ .subscriptionName("ignored")
+ .subscribe()) {
+ }
+
+ // make sure regex subscription
+ String regexSubscriptionPattern =
"persistent://sample/test/local/topic.*";
+ log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
+ try (Consumer<byte[]> consumer = client.newConsumer()
+ .topicsPattern(regexSubscriptionPattern)
+ .subscriptionName("regex-sub")
+ .subscribe()) {
+ log.info("Successfully subscribe to topics using regex {}",
regexSubscriptionPattern);
+
+ final int numMessages = 20;
+
+ try (Producer<byte[]> producer = client.newProducer()
+ .topic("persistent://sample/test/local/topic1")
+ .create()) {
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(("message-" + i).getBytes(UTF_8));
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ assertEquals("message-" + i, new String(msg.getValue(),
UTF_8));
+ }
+ }
+ }
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services