This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new d630d7d0426 [fix][client] Fix subscribing pattern topics through Proxy
not working (#20739)
d630d7d0426 is described below
commit d630d7d042682a16ba68681260e81d38a8ec893f
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jul 11 10:10:33 2023 +0800
[fix][client] Fix subscribing pattern topics through Proxy not working
(#20739)
---
.../broker/service/InactiveTopicDeleteTest.java | 11 +++------
.../pulsar/client/impl/TopicListWatcher.java | 2 +-
.../pulsar/client/impl/TopicListWatcherTest.java | 10 ++++++--
.../org/apache/pulsar/proxy/server/ProxyTest.java | 28 ++++++++++++++++++++++
4 files changed, 40 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 5d5a66efc23..4d3b9c1b19e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -35,8 +35,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
@@ -356,12 +354,9 @@ public class InactiveTopicDeleteTest extends
BrokerTestBase {
admin.topics().skipAllMessages(topic, "sub");
Awaitility.await().untilAsserted(() -> {
- Assert.assertFalse(consumer.isConnected());
- final List<ConsumerImpl> consumers = ((MultiTopicsConsumerImpl)
consumer2).getConsumers();
- consumers.forEach(c -> Assert.assertFalse(c.isConnected()));
- Assert.assertFalse(consumer2.isConnected());
-
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic));
-
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic2));
+ final List<String> topics = admin.topics().getList("prop/ns-abc");
+ Assert.assertFalse(topics.contains(topic));
+ Assert.assertFalse(topics.contains(topic2));
});
consumer.close();
consumer2.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 5ee47c929e4..7b67c195d30 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -61,7 +61,7 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
PulsarClientImpl client, Pattern topicsPattern,
long watcherId,
NamespaceName namespace, String topicsHash,
CompletableFuture<TopicListWatcher> watcherFuture)
{
- super(client, null);
+ super(client, topicsPattern.pattern());
this.topicsChangeListener = topicsChangeListener;
this.name = "Watcher(" + topicsPattern + ")";
this.connectionHandler = new ConnectionHandler(this,
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 1d245350c82..ebbcc435da2 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -53,15 +55,19 @@ public class TopicListWatcherTest {
when(client.getConfiguration()).thenReturn(new
ClientConfigurationData());
clientCnxFuture = new CompletableFuture<>();
when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture);
+ Timer timer = new HashedWheelTimer();
+ when(client.timer()).thenReturn(timer);
+ String topic = "persistent://tenant/ns/topic\\d+";
+ when(client.getConnection(topic)).thenReturn(clientCnxFuture);
watcherFuture = new CompletableFuture<>();
watcher = new TopicListWatcher(listener, client,
- Pattern.compile("persistent://tenant/ns/topic\\d+"), 7,
+ Pattern.compile(topic), 7,
NamespaceName.get("tenant/ns"), null, watcherFuture);
}
@Test
public void testWatcherGrabsConnection() {
- verify(client).getConnectionToServiceUrl();
+ verify(client).getConnection(any());
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index d1f30c13a93..a0c4d22cc2a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -258,6 +258,34 @@ public class ProxyTest extends MockedPulsarServiceBaseTest
{
}
}
+ @Test(timeOut = 60_000)
+ public void testRegexSubscriptionWithTopicDiscovery() throws Exception {
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
+ String subName = "regex-proxy-test-" + System.currentTimeMillis();
+ String regexSubscriptionPattern = "persistent://sample/test/local/.*";
+ try (Consumer<byte[]> consumer = client.newConsumer()
+ .topicsPattern(regexSubscriptionPattern)
+ .subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .patternAutoDiscoveryPeriod(10, TimeUnit.MINUTES)
+ .subscribe()) {
+ final int topics = 10;
+ final String topicPrefix =
"persistent://sample/test/local/regex-topic-";
+ for (int i = 0; i < topics; i++) {
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic(topicPrefix + i)
+ .create();
+ producer.send(("" + i).getBytes(UTF_8));
+ producer.close();
+ }
+ for (int i = 0; i < topics; i++) {
+ Message<byte[]> msg = consumer.receive();
+ assertEquals(topicPrefix + new String(msg.getValue(), UTF_8),
msg.getTopicName());
+ }
+ }
+ }
+
@Test
public void testGetSchema() throws Exception {
@Cleanup