michaeljmarshall commented on code in PR #20540:
URL: https://github.com/apache/pulsar/pull/20540#discussion_r1253375383
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -3007,7 +3048,13 @@ private ConcurrentOpenHashMap<String, Object>
getRuntimeConfigurationMap() {
* permit if it was successful to acquire it.
*/
private void createPendingLoadTopic() {
- TopicLoadingContext pendingTopic = pendingTopicLoadingQueue.poll();
+ if (!pulsar().isRunning()) {
+ log.warn("Pulsar is not running, skip create pending topic");
+ failPendingTopics();
Review Comment:
What is the purpose of calling this here if the `closeAsync` method also
calls it? The logic is correct, but it seems like we could skip this line and
leave the cleanup logic to the `closeAsync` method.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -3031,14 +3078,35 @@ private void createPendingLoadTopic() {
});
}).exceptionally(e -> {
log.error("Failed to create pending topic {}", topic, e);
- pendingTopic.getTopicFuture()
- .completeExceptionally((e instanceof RuntimeException &&
e.getCause() != null) ? e.getCause() : e);
+ Throwable cause = (e instanceof RuntimeException && e.getCause()
!= null) ? e.getCause() : e;
+ failTopicFuture(topic, pendingTopic.getTopicFuture(), cause);
// schedule to process next pending topic
inactivityMonitor.schedule(this::createPendingLoadTopic, 100,
TimeUnit.MILLISECONDS);
return null;
});
}
+ private void failPendingTopics() {
+ for (TopicLoadingContext topicLoadingContext = getNextPendingTopic();
topicLoadingContext != null;
+ topicLoadingContext = getNextPendingTopic()) {
+ topicLoadingContext.getTopicFuture()
+ .completeExceptionally(new NotAllowedException("Broker is
shutting down"));
Review Comment:
Nit: what do you think about `ServiceNotReady` instead? I haven't looked too
closely, but `NotAllowed` might not give the right client side behavior:
https://github.com/apache/pulsar/blob/21c7c628d2a3fd7277f16fe7ba72154cfbf08128/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L786-L789
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -2264,18 +2315,8 @@ private Optional<CompletableFuture<Optional<Topic>>>
findTopicFutureInCache(Topi
}
}
- private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
-
CompletableFuture<Optional<Topic>> createTopicFuture) {
- TopicName topicName = TopicName.get(topic);
- return pulsar.getNamespaceService().getBundleAsync(topicName)
- .thenAccept(namespaceBundle -> {
- removeTopicFromCache(topic, namespaceBundle,
createTopicFuture);
- });
- }
-
- private void removeTopicFromCache(String topic, NamespaceBundle
namespaceBundle,
- CompletableFuture<Optional<Topic>>
createTopicFuture) {
- String bundleName = namespaceBundle.toString();
+ private void removeTopicFromCache(String topic,
+ String bundleName,
CompletableFuture<Optional<Topic>> createTopicFuture) {
Review Comment:
Nit: I prefer to keep the types as long as possible to prevent misuse of a
method in the future. Why remove the `NamespaceBundle` type here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]