This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7dcc25554e68f94b6b1dbd7aef9d61a4e71a079c Author: Zixuan Liu <[email protected]> AuthorDate: Sun Dec 8 22:26:51 2024 +0800 [fix][admin] Listen partitioned topic creation event (#23680) Signed-off-by: Zixuan Liu <[email protected]> (cherry picked from commit 0a2ffe4743799dc253814d49fa3fd29b933444ac) --- .../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++++++++++ .../pulsar/broker/TopicEventsListenerTest.java | 23 ++++++++++++++++------ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 76add468915..1daf8d53bac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -44,6 +44,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.ClusterResources; +import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; +import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; @@ -162,6 +164,10 @@ public abstract class AdminResource extends PulsarWebResource { protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) { if (!topicName.isPersistent()) { + for (int i = 0; i < numPartitions; i++) { + pulsar().getBrokerService().getTopicEventsDispatcher() + .notify(topicName.getPartition(i).toString(), TopicEvent.CREATE, EventStage.SUCCESS); + } return CompletableFuture.completedFuture(null); } List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions); @@ -197,6 +203,8 @@ public abstract class AdminResource extends PulsarWebResource { } return null; }); + pulsar().getBrokerService().getTopicEventsDispatcher() + .notifyOnCompletion(result, topicName.getPartition(partition).toString(), TopicEvent.CREATE); return result; } @@ -598,6 +606,13 @@ public abstract class AdminResource extends PulsarWebResource { throw new RestException(Status.CONFLICT, "This topic already exists"); } }) + .thenRun(() -> { + for (int i = 0; i < numPartitions; i++) { + pulsar().getBrokerService().getTopicEventsDispatcher() + .notify(topicName.getPartition(i).toString(), TopicEvent.CREATE, + EventStage.BEFORE); + } + }) .thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties)) .thenCompose(__ -> tryCreatePartitionsAsync(numPartitions)) .thenRun(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index ceb3c1d0d93..152b4aeeeb2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -265,12 +265,23 @@ public class TopicEventsListenerTest extends BrokerTestBase { private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception { final String[] expectedEvents; if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) { - expectedEvents = new String[]{ - "LOAD__BEFORE", - "CREATE__BEFORE", - "CREATE__SUCCESS", - "LOAD__SUCCESS" - }; + if (topicTypePartitioned.equals("partitioned")) { + expectedEvents = new String[]{ + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + } else { + expectedEvents = new String[]{ + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + } } else { expectedEvents = new String[]{ // Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic
