michaeljmarshall commented on code in PR #19153:
URL: https://github.com/apache/pulsar/pull/19153#discussion_r1085749011
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1033,17 +1047,33 @@ public CompletableFuture<Optional<Topic>>
getTopic(final TopicName topicName, bo
});
} else {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
-> {
if (topicName.getPartitionIndex() <
metadata.partitions) {
- return createNonPersistentTopic(name);
+ topicEventsDispatcher
+ .notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
+
+ CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
+
+ topicEventsDispatcher.notifyOnCompletion(res,
topicName.toString(), TopicEvent.CREATE);
+ topicEventsDispatcher.notifyOnCompletion(res,
topicName.toString(), TopicEvent.LOAD);
+ return res;
}
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.FAILURE);
return
CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
- return createNonPersistentTopic(name);
+ topicEventsDispatcher.notify(name, TopicEvent.CREATE,
EventStage.BEFORE);
+
+ CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
+
+ topicEventsDispatcher.notifyOnCompletion(res, name,
TopicEvent.CREATE);
+ topicEventsDispatcher.notifyOnCompletion(res, name,
TopicEvent.LOAD);
Review Comment:
It looks like `CREATE` always implies `LOAD` in this PR. Do we expect this
to change? I wonder if we should declare in the design that `CREATE` always
implies `LOAD` and then we can skip the extra notification because
implementations know that `CREATE` also means `LOAD`. Obviously, we still need
`LOAD` because `LOAD` does not imply `CREATE`. (This is not a strong
suggestion, just wondering about messages and the contract we're creating.)
Thinking about this further, I see that on these lines, we're completing
`LOAD` and `CREATE` events that we already started. Perhaps collapsing the
events would make the design too confusing.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1033,17 +1047,33 @@ public CompletableFuture<Optional<Topic>>
getTopic(final TopicName topicName, bo
});
} else {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
-> {
if (topicName.getPartitionIndex() <
metadata.partitions) {
- return createNonPersistentTopic(name);
+ topicEventsDispatcher
+ .notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
+
+ CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
+
+ topicEventsDispatcher.notifyOnCompletion(res,
topicName.toString(), TopicEvent.CREATE);
+ topicEventsDispatcher.notifyOnCompletion(res,
topicName.toString(), TopicEvent.LOAD);
Review Comment:
Nit: do we care about the order of these notifications? Technically, the
`LOAD` will come before the `CREATE` in cases where the `res` is not yet
completed because future callbacks are tracked with a stack and are completed
in reverse order.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2153,7 +2211,9 @@ private CompletableFuture<Void>
removeTopicFutureFromCache(String topic,
TopicName topicName = TopicName.get(topic);
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
+ topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD,
EventStage.BEFORE);
removeTopicFromCache(topic, namespaceBundle,
createTopicFuture);
+ topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD,
EventStage.SUCCESS);
Review Comment:
Nit: why not add this notification to `removeTopicFromCache`? Then we could
collapse the duplicated code in this method and in
`cleanUnloadedTopicFromCache`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1064,6 +1094,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final
TopicName topicName, bo
}
public CompletableFuture<Void> deleteTopic(String topic, boolean
forceDelete) {
+ topicEventsDispatcher.notify(topic, TopicEvent.DELETE,
EventStage.BEFORE);
+ CompletableFuture<Void> result = deleteTopicInternal(topic,
forceDelete);
+ topicEventsDispatcher.notifyOnCompletion(result, topic,
TopicEvent.DELETE);
Review Comment:
Analogously to `CREATE`/`LOAD`, it looks like we do not notify `UNLOAD` when
a topic is deleted. I haven't looked at the internals of the
`deteTopicInternal` method, so correct me if I am wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]