michaeljmarshall commented on code in PR #19153:
URL: https://github.com/apache/pulsar/pull/19153#discussion_r1085749011


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1033,17 +1047,33 @@ public CompletableFuture<Optional<Topic>> 
getTopic(final TopicName topicName, bo
                 });
             } else {
             return topics.computeIfAbsent(topicName.toString(), (name) -> {
+                topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.BEFORE);
                     if (topicName.isPartitioned()) {
                         final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
                         return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
                             if (topicName.getPartitionIndex() < 
metadata.partitions) {
-                                return createNonPersistentTopic(name);
+                                topicEventsDispatcher
+                                        .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+
+                                CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
+
+                                topicEventsDispatcher.notifyOnCompletion(res, 
topicName.toString(), TopicEvent.CREATE);
+                                topicEventsDispatcher.notifyOnCompletion(res, 
topicName.toString(), TopicEvent.LOAD);
+                                return res;
                             }
+                            topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
                             return 
CompletableFuture.completedFuture(Optional.empty());
                         });
                     } else if (createIfMissing) {
-                        return createNonPersistentTopic(name);
+                        topicEventsDispatcher.notify(name, TopicEvent.CREATE, 
EventStage.BEFORE);
+
+                        CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
+
+                        topicEventsDispatcher.notifyOnCompletion(res, name, 
TopicEvent.CREATE);
+                        topicEventsDispatcher.notifyOnCompletion(res, name, 
TopicEvent.LOAD);

Review Comment:
   It looks like `CREATE` always implies `LOAD` in this PR. Do we expect this 
to change? I wonder if we should declare in the design that `CREATE` always 
implies `LOAD` and then we can skip the extra notification because 
implementations know that `CREATE` also means `LOAD`. Obviously, we still need 
`LOAD` because `LOAD` does not imply `CREATE`. (This is not a strong 
suggestion, just wondering about messages and the contract we're creating.)
   
   Thinking about this further, I see that on these lines, we're completing 
`LOAD` and `CREATE` events that we already started. Perhaps collapsing the 
events would make the design too confusing.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1033,17 +1047,33 @@ public CompletableFuture<Optional<Topic>> 
getTopic(final TopicName topicName, bo
                 });
             } else {
             return topics.computeIfAbsent(topicName.toString(), (name) -> {
+                topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.BEFORE);
                     if (topicName.isPartitioned()) {
                         final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
                         return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
                             if (topicName.getPartitionIndex() < 
metadata.partitions) {
-                                return createNonPersistentTopic(name);
+                                topicEventsDispatcher
+                                        .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
+
+                                CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
+
+                                topicEventsDispatcher.notifyOnCompletion(res, 
topicName.toString(), TopicEvent.CREATE);
+                                topicEventsDispatcher.notifyOnCompletion(res, 
topicName.toString(), TopicEvent.LOAD);

Review Comment:
   Nit: do we care about the order of these notifications? Technically, the 
`LOAD` will come before the `CREATE` in cases where the `res` is not yet 
completed because future callbacks are tracked with a stack and are completed 
in reverse order.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2153,7 +2211,9 @@ private CompletableFuture<Void> 
removeTopicFutureFromCache(String topic,
         TopicName topicName = TopicName.get(topic);
         return pulsar.getNamespaceService().getBundleAsync(topicName)
                 .thenAccept(namespaceBundle -> {
+                    topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, 
EventStage.BEFORE);
                     removeTopicFromCache(topic, namespaceBundle, 
createTopicFuture);
+                    topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, 
EventStage.SUCCESS);

Review Comment:
   Nit: why not add this notification to `removeTopicFromCache`? Then we could 
collapse the duplicated code in this method and in 
`cleanUnloadedTopicFromCache`. 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1064,6 +1094,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final 
TopicName topicName, bo
     }
 
     public CompletableFuture<Void> deleteTopic(String topic, boolean 
forceDelete) {
+        topicEventsDispatcher.notify(topic, TopicEvent.DELETE, 
EventStage.BEFORE);
+        CompletableFuture<Void> result =  deleteTopicInternal(topic, 
forceDelete);
+        topicEventsDispatcher.notifyOnCompletion(result, topic, 
TopicEvent.DELETE);

Review Comment:
   Analogously to `CREATE`/`LOAD`, it looks like we do not notify `UNLOAD` when 
a topic is deleted. I haven't looked at the internals of the 
`deteTopicInternal` method, so correct me if I am wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to