This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new eff0a29c9f9 [fix] [broker] Fast fix infinite HTTP call
getSubscriptions caused by wrong topicName (#20131)
eff0a29c9f9 is described below
commit eff0a29c9f92c0c406b2edf090a8929b2435d8cb
Author: fengyubiao <[email protected]>
AuthorDate: Sun Apr 23 09:41:28 2023 +0800
[fix] [broker] Fast fix infinite HTTP call getSubscriptions caused by wrong
topicName (#20131)
(cherry picked from commit 0c50866fbc18525f82a04c3a918628b8b50a4de8)
---
.../broker/admin/impl/PersistentTopicsBase.java | 17 ++-
...ameForInfiniteHttpCallGetSubscriptionsTest.java | 142 +++++++++++++++++++++
2 files changed, 158 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a426aa83ff3..d596bbfd06c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1119,6 +1119,21 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ /**
+ * There has a known bug will make Pulsar misidentifies
"tp-partition-0-DLQ-partition-0" as "tp-partition-0-DLQ".
+ * You can see the details from PR
https://github.com/apache/pulsar/pull/19841.
+ * This method is a quick fix and will be removed in master branch after
#19841 and PIP 263 are done.
+ */
+ private boolean isUnexpectedTopicName(PartitionedTopicMetadata
topicMetadata) {
+ if
(!topicName.toString().contains(TopicName.PARTITIONED_TOPIC_SUFFIX)){
+ return false;
+ }
+ if (topicMetadata.partitions <= 0){
+ return false;
+ }
+ return
topicName.getPartition(0).toString().equals(topicName.toString());
+ }
+
protected void internalGetSubscriptions(AsyncResponse asyncResponse,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
@@ -1136,7 +1151,7 @@ public class PersistentTopicsBase extends AdminResource {
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
.thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
+ if (partitionMetadata.partitions > 0 &&
!isUnexpectedTopicName(partitionMetadata)) {
try {
final Set<String> subscriptions =
Collections.newSetFromMap(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
new file mode 100644
index 00000000000..f9af9dc5009
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends
ProducerConsumerBase {
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setAllowAutoTopicCreationType("partitioned");
+ conf.setDefaultNumPartitions(1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testInfiniteHttpCallGetSubscriptions() throws Exception {
+ final String randomStr = UUID.randomUUID().toString().replaceAll("-",
"");
+ final String partitionedTopicName =
"persistent://my-property/my-ns/tp1_" + randomStr;
+ final String topic_p0 = partitionedTopicName +
TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
+ final String subscriptionName = "sub1";
+ final String topicDLQ = topic_p0 + "-" + subscriptionName + "-DLQ";
+
+ admin.topics().createPartitionedTopic(partitionedTopicName, 2);
+
+ // Do test.
+ ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0,
topicDLQ, subscriptionName);
+ admin.topics().getSubscriptions(topicDLQ);
+
+ // cleanup.
+ pcEntry.consumer.close();
+ pcEntry.producer.close();
+ admin.topics().deletePartitionedTopic(partitionedTopicName);
+ }
+
+ @Test
+ public void testInfiniteHttpCallGetSubscriptions2() throws Exception {
+ final String randomStr = UUID.randomUUID().toString().replaceAll("-",
"");
+ final String topicName = "persistent://my-property/my-ns/tp1_" +
randomStr + "-partition-0-abc";
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ // Do test.
+ admin.topics().getSubscriptions(topicName);
+
+ // cleanup.
+ producer.close();
+ }
+
+ @Test
+ public void testInfiniteHttpCallGetSubscriptions3() throws Exception {
+ final String randomStr = UUID.randomUUID().toString().replaceAll("-",
"");
+ final String topicName = "persistent://my-property/my-ns/tp1_" +
randomStr + "-partition-0";
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ // Do test.
+ admin.topics().getSubscriptions(topicName);
+
+ // cleanup.
+ producer.close();
+ }
+
+ @AllArgsConstructor
+ private static class ProducerAndConsumerEntry {
+ private Producer<String> producer;
+ private Consumer<String> consumer;
+ }
+
+ private ProducerAndConsumerEntry triggerDLQCreated(String topicName,
String DLQName, String subscriptionName) throws Exception {
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic(DLQName).maxRedeliverCount(2).build())
+ .receiverQueueSize(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ // send messages.
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage()
+ .value("value-" + i)
+ .sendAsync();
+ }
+ producer.flush();
+ // trigger the DLQ created.
+ for (int i = 0; i < 20; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
+ } else {
+ break;
+ }
+ }
+
+ return new ProducerAndConsumerEntry(producer, consumer);
+ }
+}
\ No newline at end of file