This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new d630d7d0426 [fix][client] Fix subscribing pattern topics through Proxy 
not working (#20739)
d630d7d0426 is described below

commit d630d7d042682a16ba68681260e81d38a8ec893f
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jul 11 10:10:33 2023 +0800

    [fix][client] Fix subscribing pattern topics through Proxy not working 
(#20739)
---
 .../broker/service/InactiveTopicDeleteTest.java    | 11 +++------
 .../pulsar/client/impl/TopicListWatcher.java       |  2 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   | 10 ++++++--
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 28 ++++++++++++++++++++++
 4 files changed, 40 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 5d5a66efc23..4d3b9c1b19e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -35,8 +35,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.TopicVersion;
@@ -356,12 +354,9 @@ public class InactiveTopicDeleteTest extends 
BrokerTestBase {
 
         admin.topics().skipAllMessages(topic, "sub");
         Awaitility.await().untilAsserted(() -> {
-            Assert.assertFalse(consumer.isConnected());
-            final List<ConsumerImpl> consumers = ((MultiTopicsConsumerImpl) 
consumer2).getConsumers();
-            consumers.forEach(c -> Assert.assertFalse(c.isConnected()));
-            Assert.assertFalse(consumer2.isConnected());
-            
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic));
-            
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic2));
+            final List<String> topics = admin.topics().getList("prop/ns-abc");
+            Assert.assertFalse(topics.contains(topic));
+            Assert.assertFalse(topics.contains(topic2));
         });
         consumer.close();
         consumer2.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 5ee47c929e4..7b67c195d30 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -61,7 +61,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
                             PulsarClientImpl client, Pattern topicsPattern, 
long watcherId,
                             NamespaceName namespace, String topicsHash,
                             CompletableFuture<TopicListWatcher> watcherFuture) 
{
-        super(client, null);
+        super(client, topicsPattern.pattern());
         this.topicsChangeListener = topicsChangeListener;
         this.name = "Watcher(" + topicsPattern + ")";
         this.connectionHandler = new ConnectionHandler(this,
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 1d245350c82..ebbcc435da2 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import 
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -53,15 +55,19 @@ public class TopicListWatcherTest {
         when(client.getConfiguration()).thenReturn(new 
ClientConfigurationData());
         clientCnxFuture = new CompletableFuture<>();
         when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture);
+        Timer timer = new HashedWheelTimer();
+        when(client.timer()).thenReturn(timer);
+        String topic = "persistent://tenant/ns/topic\\d+";
+        when(client.getConnection(topic)).thenReturn(clientCnxFuture);
         watcherFuture = new CompletableFuture<>();
         watcher = new TopicListWatcher(listener, client,
-                Pattern.compile("persistent://tenant/ns/topic\\d+"), 7,
+                Pattern.compile(topic), 7,
                 NamespaceName.get("tenant/ns"), null, watcherFuture);
     }
 
     @Test
     public void testWatcherGrabsConnection() {
-        verify(client).getConnectionToServiceUrl();
+        verify(client).getConnection(any());
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index d1f30c13a93..a0c4d22cc2a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -258,6 +258,34 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
         }
     }
 
+    @Test(timeOut = 60_000)
+    public void testRegexSubscriptionWithTopicDiscovery() throws Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
+        String subName = "regex-proxy-test-" + System.currentTimeMillis();
+        String regexSubscriptionPattern = "persistent://sample/test/local/.*";
+        try (Consumer<byte[]> consumer = client.newConsumer()
+                .topicsPattern(regexSubscriptionPattern)
+                .subscriptionName(subName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .patternAutoDiscoveryPeriod(10, TimeUnit.MINUTES)
+                .subscribe()) {
+            final int topics = 10;
+            final String topicPrefix = 
"persistent://sample/test/local/regex-topic-";
+            for (int i = 0; i < topics; i++) {
+                Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                        .topic(topicPrefix + i)
+                        .create();
+                producer.send(("" + i).getBytes(UTF_8));
+                producer.close();
+            }
+            for (int i = 0; i < topics; i++) {
+                Message<byte[]> msg = consumer.receive();
+                assertEquals(topicPrefix + new String(msg.getValue(), UTF_8), 
msg.getTopicName());
+            }
+        }
+    }
+
     @Test
     public void testGetSchema() throws Exception {
         @Cleanup

Reply via email to