This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b6a1232cf3 [fix][broker] Inconsistent behaviour for topic 
auto_creation (#20843)
9b6a1232cf3 is described below

commit 9b6a1232cf35d15b9bf492f60f5e52534d879df2
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Jul 21 13:04:30 2023 +0800

    [fix][broker] Inconsistent behaviour for topic auto_creation (#20843)
---
 .../pulsar/broker/service/BrokerService.java       | 16 ++--
 .../impl/HierarchyTopicAutoCreationTest.java       | 96 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 668195c4b80..8dc94ca8740 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3023,12 +3023,9 @@ public class BrokerService implements Closeable {
         if (pulsar.getNamespaceService() == null) {
             return FutureUtil.failedFuture(new NamingException("namespace 
service is not ready"));
         }
-        Optional<Policies> policies =
-                pulsar.getPulsarResources().getNamespaceResources()
-                        .getPoliciesIfCached(topicName.getNamespaceObject());
-        return pulsar.getNamespaceService().checkTopicExists(topicName)
-                .thenCompose(topicExists -> {
-                    return fetchPartitionedTopicMetadataAsync(topicName)
+        return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(policies -> 
pulsar.getNamespaceService().checkTopicExists(topicName)
+                    .thenCompose(topicExists -> 
fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
                                 CompletableFuture<PartitionedTopicMetadata> 
future = new CompletableFuture<>();
 
@@ -3041,7 +3038,7 @@ public class BrokerService implements Closeable {
                                             && !topicExists
                                             && !topicName.isPartitioned()
                                             && pulsar.getBrokerService()
-                                                            
.isDefaultTopicTypePartitioned(topicName, policies)) {
+                                            
.isDefaultTopicTypePartitioned(topicName, policies)) {
                                         
isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> {
                                             if (allowed) {
                                                 pulsar.getBrokerService()
@@ -3050,7 +3047,7 @@ public class BrokerService implements Closeable {
                                                         .exceptionally(ex -> {
                                                             if (ex.getCause()
                                                                     instanceof 
MetadataStoreException
-                                                                        
.AlreadyExistsException) {
+                                                                    
.AlreadyExistsException) {
                                                                 // The 
partitioned topic might be created concurrently
                                                                 
fetchPartitionedTopicMetadataAsync(topicName)
                                                                         
.whenComplete((metadata2, ex2) -> {
@@ -3078,8 +3075,7 @@ public class BrokerService implements Closeable {
                                 });
 
                                 return future;
-                            });
-                });
+                            })));
     }
 
     @SuppressWarnings("deprecation")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
new file mode 100644
index 00000000000..8c93b293c41
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.UUID;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+@Slf4j
+public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(invocationCount = 3)
+    @SneakyThrows
+    public void testPartitionedTopicAutoCreation() {
+        // Create namespace
+        final String namespace = "public/testPartitionedTopicAutoCreation";
+        admin.namespaces().createNamespace(namespace);
+        // Set policies
+        final AutoTopicCreationOverride expectedPolicies = 
AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(true)
+                .topicType("partitioned")
+                .defaultNumPartitions(1)
+                .build();
+        admin.namespaces().setAutoTopicCreation(namespace, expectedPolicies);
+        // Double-check the policies
+        final AutoTopicCreationOverride nsAutoTopicCreationOverride = 
admin.namespaces()
+                .getAutoTopicCreation(namespace);
+        Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies);
+        // Background invalidate cache
+        final MetadataCache<Policies> nsCache = 
pulsar.getPulsarResources().getNamespaceResources().getCache();
+        final Thread t1 = new Thread(() -> {
+            while (true) {
+                nsCache.invalidate("/admin/policies/" + namespace);
+            }
+        });
+        t1.start();
+
+        // trigger auto-creation
+        final String topicName = "persistent://" + namespace + "/test-" + 
UUID.randomUUID();
+        @Cleanup final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        final List<String> topics = admin.topics().getList(namespace);
+        Assert.assertEquals(topics.size(), 1);  // expect only one topic
+        Assert.assertEquals(topics.get(0),
+                TopicName.get(topicName).getPartition(0).toString()); // 
expect partitioned topic
+
+        // double-check policies
+        final AutoTopicCreationOverride actualPolicies2 = 
admin.namespaces().getAutoTopicCreation(namespace);
+        Assert.assertEquals(actualPolicies2, expectedPolicies);
+
+        t1.interrupt();
+    }
+}

Reply via email to