This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f8069df045fdaa14d3bec2ed811b7e8c94a232da Author: Zixuan Liu <[email protected]> AuthorDate: Thu Nov 6 15:48:49 2025 +0800 [fix][broker] Trigger topic creation event only once for non-existent topic (#24802) (cherry picked from commit d168ab8ada8e586d4e2e2f78ff11762017ff7051) --- .../pulsar/broker/service/BrokerService.java | 14 ++-- .../pulsar/broker/TopicEventsListenerTest.java | 80 +++++++++++++++++++--- .../pulsar/broker/service/PersistentTopicTest.java | 3 + .../pulsar/broker/service/ServerCnxTest.java | 3 + 4 files changed, 88 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 168f0fccd3e..cab7375cd06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1794,7 +1794,11 @@ public class BrokerService implements Closeable { maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated) .thenCompose(__ -> getManagedLedgerConfig(topicName)) - .thenAccept(managedLedgerConfig -> { + .thenCombine(pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenApply(n -> { + boolean found = n.isExists(); + n.recycle(); + return found; + }), (managedLedgerConfig, exists) -> { if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { // init managedLedger interceptor Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>(); @@ -1831,8 +1835,10 @@ public class BrokerService implements Closeable { }); if (createIfMissing) { - topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE); - topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE); + if (!exists) { + topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE); + topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE); + } } topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, TopicEvent.LOAD); @@ -1922,7 +1928,7 @@ public class BrokerService implements Closeable { } } }, () -> isTopicNsOwnedByBrokerAsync(topicName), null); - + return null; }).exceptionally((exception) -> { boolean migrationFailure = exception.getCause() instanceof TopicMigratedException; String msg = migrationFailure ? "Topic is already migrated" : diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index b597d98efc3..fb45094c569 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; @@ -265,14 +266,30 @@ public class TopicEventsListenerTest extends BrokerTestBase { final String[] expectedEvents; if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) { if (topicTypePartitioned.equals("partitioned")) { - expectedEvents = new String[]{ - "CREATE__BEFORE", - "CREATE__SUCCESS", - "LOAD__BEFORE", - "CREATE__BEFORE", - "CREATE__SUCCESS", - "LOAD__SUCCESS" - }; + if (topicDomain.equalsIgnoreCase("persistent")) { + expectedEvents = new String[]{ + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__BEFORE", + "LOAD__SUCCESS" + }; + } else { + // For non-persistent partitioned topic, only metadata is initially created; + // partitions are created when the client connects. + // PR #23680 currently records creation events at metadata creation, + // and the broker records them again when partitions are loaded, + // which can result in multiple events. + // Ideally, #23680 should not record the event here, + // because the topic is not fully created until the client connects. + expectedEvents = new String[]{ + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS", + }; + } } else { expectedEvents = new String[]{ "LOAD__BEFORE", @@ -308,6 +325,53 @@ public class TopicEventsListenerTest extends BrokerTestBase { Assert.assertEquals(events.toArray(), expectedEvents)); } + @DataProvider(name = "createTopicEventType") + public static Object[][] createTopicEventType() { + return new Object[][] { + {"persistent", "partitioned"}, + {"persistent", "non-partitioned"}, + {"non-persistent", "partitioned"}, + {"non-persistent", "non-partitioned"}, + }; + } + + @Test(dataProvider = "createTopicEventType") + public void testCreateTopicEvent(String topicTypePersistence, String topicTypePartitioned) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + events.clear(); + if (topicTypePartitioned.equals("partitioned")) { + topicNameToWatch = topicName + "-partition-0"; + admin.topics().createPartitionedTopic(topicName, 1); + } else { + topicNameToWatch = topicName; + admin.topics().createNonPartitionedTopic(topicName); + } + + triggerPartitionsCreation(topicName); // ensure partitions are really created + triggerPartitionsCreation(topicName); // trigger again to ensure no duplicate events + + Awaitility.await().during(3, TimeUnit.SECONDS) + .untilAsserted(() -> { + if (topicTypePartitioned.equals("partitioned") && topicTypePersistence.equals("non-persistent")) { + // For non-persistent partitioned topic, only metadata is initially created; + // partitions are created when the client connects. + // PR #23680 currently records creation events at metadata creation, + // and the broker records them again when partitions are loaded, + // which can result in multiple events. + // Ideally, #23680 should not record the event here, + // because the topic is not fully created until the client connects. + assertThat(events.toArray()) + .contains("CREATE__BEFORE") + .contains("CREATE__SUCCESS"); + } else { + assertThat(events.toArray()) + .containsOnlyOnce("CREATE__BEFORE") + .containsOnlyOnce("CREATE__SUCCESS"); + } + }); + } + private PulsarAdmin createPulsarAdmin() throws PulsarClientException { return PulsarAdmin.builder() .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index eaec93f78a1..ca2c2648266 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -101,6 +101,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.namespace.TopicExistsInfo; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -227,6 +228,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(true).when(nsSvc).isServiceUnitOwned(any()); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); + doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc) + .checkTopicExistsAsync(any()); setupMLAsyncCallbackMocks(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 2cfbac35bfc..a19bc017574 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -105,6 +105,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.namespace.TopicExistsInfo; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.ServerCnx.State; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -237,6 +238,8 @@ public class ServerCnxTest { NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics( NamespaceName.get("use", "ns-abc")); + doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(namespaceService) + .checkTopicExistsAsync(any()); setupMLAsyncCallbackMocks();
