This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new ca4a5d18969 [Broker] Do not create missing topic when loading
namespace (#13948)
ca4a5d18969 is described below
commit ca4a5d18969add06c0ae0bc495e48835b28602a9
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Jan 28 02:25:51 2022 -0600
[Broker] Do not create missing topic when loading namespace (#13948)
### Motivation
Based on my reading of the `PulsarService#loadNamespaceTopics` method, it
appears that there is a data race that could lead to the broker recreating a
topic that was just deleted. Currently, we do the following when loading a
namespace bundle:
1. Get all topics for a namespace bundle from the metadata store.
2. Asynchronously call `getOrCreateTopic` for each of the topics.
If a topic was deleted after step 1 and before step 2, the topic will get
recreated (assuming that auto topic creation is enabled).
### Modifications
* Use `getTopicIfExists` to load a namespace bundle instead of
`getOrCreateTopic`.
* Only log the count of actually loaded topics.
### Verifying this change
From my perspective, this is a minor change that should remove the
possibility of a very specific race condition. I don't see an easy way to test
this behavior. However, the `getTopics` method, which backs both
`getTopicIfExists` and `getOrCreateTopic` is well tested, so I think it is okay
to skip adding a test here. Let me know if you disagree.
(cherry picked from commit 740268832101fa8653a627dd3e4e1f629822e2f4)
---
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index acbd1ea33e4..a95ba68344a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1069,7 +1069,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
LOG.info("Loading all topics on bundle: {}", bundle);
NamespaceName nsName = bundle.getNamespaceObject();
- List<CompletableFuture<Topic>> persistentTopics =
Lists.newArrayList();
+ List<CompletableFuture<Optional<Topic>>> persistentTopics =
Lists.newArrayList();
long topicLoadStart = System.nanoTime();
for (String topic :
getNamespaceService().getListOfPersistentTopics(nsName)
@@ -1077,7 +1077,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName) &&
!isTransactionSystemTopic(topicName)) {
- CompletableFuture<Topic> future =
brokerService.getOrCreateTopic(topic);
+ CompletableFuture<Optional<Topic>> future =
brokerService.getTopicIfExists(topic);
if (future != null) {
persistentTopics.add(future);
}
@@ -1091,7 +1091,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
FutureUtil.waitForAll(persistentTopics).thenRun(() -> {
double topicLoadTimeSeconds =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - topicLoadStart)
/ 1000.0;
- LOG.info("Loaded {} topics on {} -- time taken: {}
seconds", persistentTopics.size(), bundle,
+ long numTopicsLoaded = persistentTopics.stream()
+ .filter(optionalTopicFuture ->
optionalTopicFuture.getNow(Optional.empty()).isPresent())
+ .count();
+ LOG.info("Loaded {} topics on {} -- time taken: {}
seconds", numTopicsLoaded, bundle,
topicLoadTimeSeconds);
});
}