This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c067ce2802 [fix][broker] Fix create topic with different auto 
creation strategies causes race condition (#21545)
3c067ce2802 is described below

commit 3c067ce28025e116146977118312a1471ba284f5
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Nov 10 13:30:05 2023 +0800

    [fix][broker] Fix create topic with different auto creation strategies 
causes race condition (#21545)
---
 .../pulsar/broker/service/BrokerService.java       | 14 +++++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java | 57 ++++++++++++++++++++++
 .../service/persistent/PersistentTopicTest.java    |  3 +-
 4 files changed, 72 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4f64d5aab86..b9a8e74b9a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1075,10 +1075,22 @@ public class BrokerService implements Closeable {
                                             return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
                                                     properties, topicPolicies);
                                         }
-                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                        final String errorMsg =
+                                                String.format("Illegal topic 
partition name %s with max allowed "
+                                                        + "%d partitions", 
topicName, metadata.partitions);
+                                        log.warn(errorMsg);
+                                        return FutureUtil
+                                                .failedFuture(new 
BrokerServiceException.NotAllowedException(errorMsg));
                                     });
                         }
                         return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
+                    }).thenCompose(optionalTopic -> {
+                        if (!optionalTopic.isPresent() && createIfMissing) {
+                            log.warn("[{}] Try to recreate the topic with 
createIfMissing=true "
+                                    + "but the returned topic is empty", 
topicName);
+                            return getTopic(topicName, createIfMissing, 
properties);
+                        }
+                        return 
CompletableFuture.completedFuture(optionalTopic);
                     });
                 });
             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 5abb0e02e58..2a49c14e355 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3193,7 +3193,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             admin.topics().createSubscription(partitionedTopicName + 
"-partition-" + startPartitions, subName1,
                     MessageId.earliest);
             fail("Unexpected behaviour");
-        } catch (PulsarAdminException.PreconditionFailedException ex) {
+        } catch (PulsarAdminException.ConflictException ex) {
             // OK
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 590edc2d3f3..c9138beee52 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -27,7 +27,10 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -40,6 +43,7 @@ import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -55,6 +59,7 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
         conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
         conf.setAllowAutoTopicCreation(true);
         conf.setDefaultNumPartitions(3);
+        conf.setForceDeleteNamespaceAllowed(true);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -186,4 +191,56 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
         }
 
     }
+
+    @Test
+    public void testClientWithAutoCreationGotNotFoundException() throws 
PulsarAdminException, PulsarClientException {
+        final String namespace = "public/test_1";
+        final String topicName = 
"persistent://public/test_1/test_auto_creation_got_not_found"
+                + System.currentTimeMillis();
+        final int retryTimes = 30;
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setAutoTopicCreation(namespace, 
AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(true)
+                .topicType("non-partitioned")
+                .build());
+
+        @Cleanup("shutdown")
+        final ExecutorService executor1 = Executors.newSingleThreadExecutor();
+
+        @Cleanup("shutdown")
+        final ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+        for (int i = 0; i < retryTimes; i++) {
+            final CompletableFuture<Void> adminListSub = 
CompletableFuture.runAsync(() -> {
+                try {
+                    admin.topics().getSubscriptions(topicName);
+                } catch (PulsarAdminException e) {
+                    throw new RuntimeException(e);
+                }
+            }, executor1);
+
+            final CompletableFuture<Consumer<byte[]>> consumerSub = 
CompletableFuture.supplyAsync(() -> {
+                try {
+                    return pulsarClient.newConsumer()
+                            .topic(topicName)
+                            .subscriptionName("sub-1")
+                            .subscribe();
+                } catch (PulsarClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }, executor2);
+
+            try {
+                adminListSub.join();
+            } catch (Throwable ex) {
+                // we don't care the exception.
+            }
+
+            consumerSub.join().close();
+            admin.topics().delete(topicName, true);
+        }
+
+        admin.namespaces().deleteNamespace(namespace, true);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 9995b6a28a9..6f60a13fd48 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -457,8 +457,7 @@ public class PersistentTopicTest extends BrokerTestBase {
                     .topic(partition.toString())
                     .create();
             fail("unexpected behaviour");
-        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
-
+        } catch (PulsarClientException.NotAllowedException ex) {
         }
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }

Reply via email to