This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0a2ffe47437 [fix][admin] Listen partitioned topic creation event
(#23680)
0a2ffe47437 is described below
commit 0a2ffe4743799dc253814d49fa3fd29b933444ac
Author: Zixuan Liu <[email protected]>
AuthorDate: Sun Dec 8 22:26:51 2024 +0800
[fix][admin] Listen partitioned topic creation event (#23680)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++++++++++
.../pulsar/broker/TopicEventsListenerTest.java | 23 ++++++++++++++++------
2 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 45772dc279b..4d890a3d5db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -45,6 +45,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
+import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
+import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
@@ -166,6 +168,10 @@ public abstract class AdminResource extends
PulsarWebResource {
protected CompletableFuture<Void> tryCreatePartitionsAsync(int
numPartitions) {
if (!topicName.isPersistent()) {
+ for (int i = 0; i < numPartitions; i++) {
+ pulsar().getBrokerService().getTopicEventsDispatcher()
+ .notify(topicName.getPartition(i).toString(),
TopicEvent.CREATE, EventStage.SUCCESS);
+ }
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
@@ -201,6 +207,8 @@ public abstract class AdminResource extends
PulsarWebResource {
}
return null;
});
+ pulsar().getBrokerService().getTopicEventsDispatcher()
+ .notifyOnCompletion(result,
topicName.getPartition(partition).toString(), TopicEvent.CREATE);
return result;
}
@@ -594,6 +602,13 @@ public abstract class AdminResource extends
PulsarWebResource {
throw new RestException(Status.CONFLICT, "This topic
already exists");
}
})
+ .thenRun(() -> {
+ for (int i = 0; i < numPartitions; i++) {
+ pulsar().getBrokerService().getTopicEventsDispatcher()
+ .notify(topicName.getPartition(i).toString(),
TopicEvent.CREATE,
+ EventStage.BEFORE);
+ }
+ })
.thenCompose(__ ->
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index ceb3c1d0d93..152b4aeeeb2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -265,12 +265,23 @@ public class TopicEventsListenerTest extends
BrokerTestBase {
private void createTopicAndVerifyEvents(String topicDomain, String
topicTypePartitioned, String topicName) throws Exception {
final String[] expectedEvents;
if (topicDomain.equalsIgnoreCase("persistent") ||
topicTypePartitioned.equals("partitioned")) {
- expectedEvents = new String[]{
- "LOAD__BEFORE",
- "CREATE__BEFORE",
- "CREATE__SUCCESS",
- "LOAD__SUCCESS"
- };
+ if (topicTypePartitioned.equals("partitioned")) {
+ expectedEvents = new String[]{
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__BEFORE",
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__SUCCESS"
+ };
+ } else {
+ expectedEvents = new String[]{
+ "LOAD__BEFORE",
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__SUCCESS"
+ };
+ }
} else {
expectedEvents = new String[]{
// Before https://github.com/apache/pulsar/pull/21995,
Pulsar will skip create topic if the topic