This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c50866fbc1 [fix] [broker] Fast fix infinite HTTP call 
getSubscriptions caused by wrong topicName (#20131)
0c50866fbc1 is described below

commit 0c50866fbc18525f82a04c3a918628b8b50a4de8
Author: fengyubiao <[email protected]>
AuthorDate: Sun Apr 23 09:41:28 2023 +0800

    [fix] [broker] Fast fix infinite HTTP call getSubscriptions caused by wrong 
topicName (#20131)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  32 +++--
 ...ameForInfiniteHttpCallGetSubscriptionsTest.java | 143 +++++++++++++++++++++
 2 files changed, 167 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7347d6dbf20..6b163ae0446 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
+import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.github.zafarkhaja.semver.Version;
@@ -1171,6 +1172,21 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> 
pulsar().getBrokerService().deleteTopic(topicName.toString(), force));
     }
 
+    /**
+     * There has a known bug will make Pulsar misidentifies 
"tp-partition-0-DLQ-partition-0" as "tp-partition-0-DLQ".
+     * You can see the details from PR 
https://github.com/apache/pulsar/pull/19841.
+     * This method is a quick fix and will be removed in master branch after 
#19841 and PIP 263 are done.
+     */
+    private boolean isUnexpectedTopicName(PartitionedTopicMetadata 
topicMetadata) {
+        if (!topicName.toString().contains(PARTITIONED_TOPIC_SUFFIX)){
+            return false;
+        }
+        if (topicMetadata.partitions <= 0){
+            return false;
+        }
+        return 
topicName.getPartition(0).toString().equals(topicName.toString());
+    }
+
     protected void internalGetSubscriptions(AsyncResponse asyncResponse, 
boolean authoritative) {
         CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
@@ -1188,7 +1204,7 @@ public class PersistentTopicsBase extends AdminResource {
                     } else {
                         getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
                                 .thenAccept(partitionMetadata -> {
-                            if (partitionMetadata.partitions > 0) {
+                            if (partitionMetadata.partitions > 0 && 
!isUnexpectedTopicName(partitionMetadata)) {
                                 try {
                                     final Set<String> subscriptions =
                                             Collections.newSetFromMap(
@@ -3716,7 +3732,7 @@ public class PersistentTopicsBase extends AdminResource {
                             .thenCompose(metadata -> {
                                 if (metadata.partitions > 0) {
                                     return 
validateTopicOwnershipAsync(TopicName.get(topicName.toString()
-                                    + TopicName.PARTITIONED_TOPIC_SUFFIX + 0), 
authoritative);
+                                    + PARTITIONED_TOPIC_SUFFIX + 0), 
authoritative);
                                 } else {
                                     return 
validateTopicOwnershipAsync(topicName, authoritative);
                                 }
@@ -4543,7 +4559,7 @@ public class PersistentTopicsBase extends AdminResource {
     private CompletableFuture<Void> validatePartitionTopicUpdateAsync(String 
topicName, int numberOfPartition) {
         return internalGetListAsync().thenCompose(existingTopicList -> {
             TopicName partitionTopicName = TopicName.get(domain(), 
namespaceName, topicName);
-            String prefix = partitionTopicName.getPartitionedTopicName() + 
TopicName.PARTITIONED_TOPIC_SUFFIX;
+            String prefix = partitionTopicName.getPartitionedTopicName() + 
PARTITIONED_TOPIC_SUFFIX;
             return getPartitionedTopicMetadataAsync(partitionTopicName, false, 
false)
                     .thenAccept(metadata -> {
                         int oldPartition = metadata.partitions;
@@ -4551,8 +4567,8 @@ public class PersistentTopicsBase extends AdminResource {
                             if (existingTopicName.startsWith(prefix)) {
                                 try {
                                     long suffix = 
Long.parseLong(existingTopicName.substring(
-                                            
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
-                                                    + 
TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
+                                            
existingTopicName.indexOf(PARTITIONED_TOPIC_SUFFIX)
+                                                    + 
PARTITIONED_TOPIC_SUFFIX.length()));
                                     // Skip partition of partitioned topic by 
making sure
                                     // the numeric suffix greater than old 
partition number.
                                     if (suffix >= oldPartition && suffix <= 
(long) numberOfPartition) {
@@ -4593,12 +4609,12 @@ public class PersistentTopicsBase extends AdminResource 
{
      */
     private CompletableFuture<Void> validateNonPartitionTopicNameAsync(String 
topicName) {
         CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
-        if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
+        if (topicName.contains(PARTITIONED_TOPIC_SUFFIX)) {
             try {
                 // First check if what's after suffix "-partition-" is number 
or not, if not number then can create.
-                int partitionIndex = 
topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
+                int partitionIndex = 
topicName.indexOf(PARTITIONED_TOPIC_SUFFIX);
                 long suffix = Long.parseLong(topicName.substring(partitionIndex
-                        + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
+                        + PARTITIONED_TOPIC_SUFFIX.length()));
                 TopicName partitionTopicName = TopicName.get(domain(),
                         namespaceName, topicName.substring(0, partitionIndex));
                 ret = getPartitionedTopicMetadataAsync(partitionTopicName, 
false, false)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
new file mode 100644
index 00000000000..2efc4f4e780
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends 
ProducerConsumerBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        conf.setDefaultNumPartitions(1);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testInfiniteHttpCallGetSubscriptions() throws Exception {
+        final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
+        final String partitionedTopicName = 
"persistent://my-property/my-ns/tp1_" + randomStr;
+        final String topic_p0 = partitionedTopicName + 
TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
+        final String subscriptionName = "sub1";
+        final String topicDLQ = topic_p0 + "-" + subscriptionName + "-DLQ";
+
+        admin.topics().createPartitionedTopic(partitionedTopicName, 2);
+
+        // Do test.
+        ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, 
topicDLQ, subscriptionName);
+        admin.topics().getSubscriptions(topicDLQ);
+
+        // cleanup.
+        pcEntry.consumer.close();
+        pcEntry.producer.close();
+        admin.topics().deletePartitionedTopic(partitionedTopicName);
+    }
+
+    @Test
+    public void testInfiniteHttpCallGetSubscriptions2() throws Exception {
+        final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
+        final String topicName = "persistent://my-property/my-ns/tp1_" + 
randomStr + "-partition-0-abc";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        // Do test.
+        admin.topics().getSubscriptions(topicName);
+
+        // cleanup.
+        producer.close();
+    }
+
+    @Test
+    public void testInfiniteHttpCallGetSubscriptions3() throws Exception {
+        final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
+        final String topicName = "persistent://my-property/my-ns/tp1_" + 
randomStr + "-partition-0";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        // Do test.
+        admin.topics().getSubscriptions(topicName);
+
+        // cleanup.
+        producer.close();
+    }
+
+    @AllArgsConstructor
+    private static class ProducerAndConsumerEntry {
+        private Producer<String> producer;
+        private Consumer<String> consumer;
+    }
+
+    private ProducerAndConsumerEntry triggerDLQCreated(String topicName, 
String DLQName, String subscriptionName) throws Exception {
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic(DLQName).maxRedeliverCount(2).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+        // send messages.
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage()
+                    .value("value-" + i)
+                    .sendAsync();
+        }
+        producer.flush();
+        // trigger the DLQ created.
+        for (int i = 0; i < 20; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
+            } else {
+                break;
+            }
+        }
+
+        return new ProducerAndConsumerEntry(producer, consumer);
+    }
+}
\ No newline at end of file

Reply via email to