This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d168ab8ada8 [fix][broker] Trigger topic creation event only once for 
non-existent topic (#24802)
d168ab8ada8 is described below

commit d168ab8ada8e586d4e2e2f78ff11762017ff7051
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Nov 6 15:48:49 2025 +0800

    [fix][broker] Trigger topic creation event only once for non-existent topic 
(#24802)
---
 .../pulsar/broker/service/BrokerService.java       | 14 ++--
 .../pulsar/broker/TopicEventsListenerTest.java     | 80 +++++++++++++++++++---
 .../pulsar/broker/service/PersistentTopicTest.java |  3 +
 .../pulsar/broker/service/ServerCnxTest.java       |  3 +
 4 files changed, 88 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index db58e2e6525..3323b9db713 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1824,7 +1824,11 @@ public class BrokerService implements Closeable {
         maxTopicsCheck.thenCompose(partitionedTopicMetadata -> 
validateTopicConsistency(topicName))
                 .thenCompose(__ -> isTopicAlreadyMigrated)
                 .thenCompose(__ -> getManagedLedgerConfig(topicName))
-        .thenAccept(managedLedgerConfig -> {
+                
.thenCombine(pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenApply(n
 -> {
+                            boolean found = n.isExists();
+                            n.recycle();
+                            return found;
+                        }), (managedLedgerConfig, exists) -> {
             if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
                 // init managedLedger interceptor
                 Set<BrokerEntryMetadataInterceptor> interceptors = new 
HashSet<>();
@@ -1861,8 +1865,10 @@ public class BrokerService implements Closeable {
             });
 
             if (createIfMissing) {
-                topicEventsDispatcher.notify(topic, TopicEvent.CREATE, 
EventStage.BEFORE);
-                topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, 
TopicEvent.CREATE);
+                if (!exists) {
+                    topicEventsDispatcher.notify(topic, TopicEvent.CREATE, 
EventStage.BEFORE);
+                    topicEventsDispatcher.notifyOnCompletion(topicFuture, 
topic, TopicEvent.CREATE);
+                }
             }
             topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, 
TopicEvent.LOAD);
 
@@ -1952,7 +1958,7 @@ public class BrokerService implements Closeable {
                             }
                         }
                     }, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
-
+            return null;
         }).exceptionally((exception) -> {
             boolean migrationFailure = exception.getCause() instanceof 
TopicMigratedException;
             String msg = migrationFailure ? "Topic is already migrated" :
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index b597d98efc3..fb45094c569 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
@@ -265,14 +266,30 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
         final String[] expectedEvents;
         if (topicDomain.equalsIgnoreCase("persistent") || 
topicTypePartitioned.equals("partitioned")) {
             if (topicTypePartitioned.equals("partitioned")) {
-                expectedEvents = new String[]{
-                        "CREATE__BEFORE",
-                        "CREATE__SUCCESS",
-                        "LOAD__BEFORE",
-                        "CREATE__BEFORE",
-                        "CREATE__SUCCESS",
-                        "LOAD__SUCCESS"
-                };
+                if (topicDomain.equalsIgnoreCase("persistent")) {
+                    expectedEvents = new String[]{
+                            "CREATE__BEFORE",
+                            "CREATE__SUCCESS",
+                            "LOAD__BEFORE",
+                            "LOAD__SUCCESS"
+                    };
+                } else {
+                    // For non-persistent partitioned topic, only metadata is 
initially created;
+                    // partitions are created when the client connects.
+                    // PR #23680 currently records creation events at metadata 
creation,
+                    // and the broker records them again when partitions are 
loaded,
+                    // which can result in multiple events.
+                    // Ideally, #23680 should not record the event here,
+                    // because the topic is not fully created until the client 
connects.
+                    expectedEvents = new String[]{
+                            "CREATE__BEFORE",
+                            "CREATE__SUCCESS",
+                            "LOAD__BEFORE",
+                            "CREATE__BEFORE",
+                            "CREATE__SUCCESS",
+                            "LOAD__SUCCESS",
+                    };
+                }
             } else {
                 expectedEvents = new String[]{
                         "LOAD__BEFORE",
@@ -308,6 +325,53 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
                 Assert.assertEquals(events.toArray(), expectedEvents));
     }
 
+    @DataProvider(name = "createTopicEventType")
+    public static Object[][] createTopicEventType() {
+        return new Object[][] {
+                {"persistent", "partitioned"},
+                {"persistent", "non-partitioned"},
+                {"non-persistent", "partitioned"},
+                {"non-persistent", "non-partitioned"},
+        };
+    }
+
+    @Test(dataProvider = "createTopicEventType")
+    public void testCreateTopicEvent(String topicTypePersistence, String 
topicTypePartitioned) throws Exception {
+        String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
+
+        events.clear();
+        if (topicTypePartitioned.equals("partitioned")) {
+            topicNameToWatch = topicName + "-partition-0";
+            admin.topics().createPartitionedTopic(topicName, 1);
+        } else {
+            topicNameToWatch = topicName;
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        triggerPartitionsCreation(topicName); // ensure partitions are really 
created
+        triggerPartitionsCreation(topicName); // trigger again to ensure no 
duplicate events
+
+        Awaitility.await().during(3, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    if (topicTypePartitioned.equals("partitioned") && 
topicTypePersistence.equals("non-persistent")) {
+                        // For non-persistent partitioned topic, only metadata 
is initially created;
+                        // partitions are created when the client connects.
+                        // PR #23680 currently records creation events at 
metadata creation,
+                        // and the broker records them again when partitions 
are loaded,
+                        // which can result in multiple events.
+                        // Ideally, #23680 should not record the event here,
+                        // because the topic is not fully created until the 
client connects.
+                        assertThat(events.toArray())
+                                .contains("CREATE__BEFORE")
+                                .contains("CREATE__SUCCESS");
+                    } else {
+                        assertThat(events.toArray())
+                                .containsOnlyOnce("CREATE__BEFORE")
+                                .containsOnlyOnce("CREATE__SUCCESS");
+                    }
+                });
+    }
+
     private PulsarAdmin createPulsarAdmin() throws PulsarClientException {
         return PulsarAdmin.builder()
                 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d4306a11859..37e1e058cc2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -101,6 +101,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -226,6 +227,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
+        
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc)
+                .checkTopicExistsAsync(any());
 
         setupMLAsyncCallbackMocks();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2cfbac35bfc..a19bc017574 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -105,6 +105,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -237,6 +238,8 @@ public class ServerCnxTest {
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
                 NamespaceName.get("use", "ns-abc"));
+        
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(namespaceService)
+                .checkTopicExistsAsync(any());
 
         setupMLAsyncCallbackMocks();
 

Reply via email to