This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7dcc25554e68f94b6b1dbd7aef9d61a4e71a079c
Author: Zixuan Liu <[email protected]>
AuthorDate: Sun Dec 8 22:26:51 2024 +0800

    [fix][admin] Listen partitioned topic creation event (#23680)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    (cherry picked from commit 0a2ffe4743799dc253814d49fa3fd29b933444ac)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 ++++++++++++++
 .../pulsar/broker/TopicEventsListenerTest.java     | 23 ++++++++++++++++------
 2 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 76add468915..1daf8d53bac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -44,6 +44,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.ClusterResources;
+import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
+import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
 import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -162,6 +164,10 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     protected CompletableFuture<Void> tryCreatePartitionsAsync(int 
numPartitions) {
         if (!topicName.isPersistent()) {
+            for (int i = 0; i < numPartitions; i++) {
+                pulsar().getBrokerService().getTopicEventsDispatcher()
+                        .notify(topicName.getPartition(i).toString(), 
TopicEvent.CREATE, EventStage.SUCCESS);
+            }
             return CompletableFuture.completedFuture(null);
         }
         List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
@@ -197,6 +203,8 @@ public abstract class AdminResource extends 
PulsarWebResource {
                     }
                     return null;
                 });
+        pulsar().getBrokerService().getTopicEventsDispatcher()
+                .notifyOnCompletion(result, 
topicName.getPartition(partition).toString(), TopicEvent.CREATE);
         return result;
     }
 
@@ -598,6 +606,13 @@ public abstract class AdminResource extends 
PulsarWebResource {
                         throw new RestException(Status.CONFLICT, "This topic 
already exists");
                     }
                 })
+                .thenRun(() -> {
+                    for (int i = 0; i < numPartitions; i++) {
+                        pulsar().getBrokerService().getTopicEventsDispatcher()
+                                .notify(topicName.getPartition(i).toString(), 
TopicEvent.CREATE,
+                                        EventStage.BEFORE);
+                    }
+                })
                 .thenCompose(__ -> 
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
                 .thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
                 .thenRun(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index ceb3c1d0d93..152b4aeeeb2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -265,12 +265,23 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
     private void createTopicAndVerifyEvents(String topicDomain, String 
topicTypePartitioned, String topicName) throws Exception {
         final String[] expectedEvents;
         if (topicDomain.equalsIgnoreCase("persistent") || 
topicTypePartitioned.equals("partitioned")) {
-            expectedEvents = new String[]{
-                    "LOAD__BEFORE",
-                    "CREATE__BEFORE",
-                    "CREATE__SUCCESS",
-                    "LOAD__SUCCESS"
-            };
+            if (topicTypePartitioned.equals("partitioned")) {
+                expectedEvents = new String[]{
+                        "CREATE__BEFORE",
+                        "CREATE__SUCCESS",
+                        "LOAD__BEFORE",
+                        "CREATE__BEFORE",
+                        "CREATE__SUCCESS",
+                        "LOAD__SUCCESS"
+                };
+            } else {
+                expectedEvents = new String[]{
+                        "LOAD__BEFORE",
+                        "CREATE__BEFORE",
+                        "CREATE__SUCCESS",
+                        "LOAD__SUCCESS"
+                };
+            }
         } else {
             expectedEvents = new String[]{
                     // Before https://github.com/apache/pulsar/pull/21995, 
Pulsar will skip create topic if the topic

Reply via email to