This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6e3d5d8a9dc [fix][broker][branch-4.0] Fix failed
testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts
(#24947)
6e3d5d8a9dc is described below
commit 6e3d5d8a9dc62db0864f852a87295bf1e75a6e07
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 5 21:31:50 2025 +0800
[fix][broker][branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading
due to topic future cache conflicts (#24947)
---
.../pulsar/broker/service/BrokerService.java | 51 +++++++++++-----------
1 file changed, 25 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0ee7f97b0a0..168f0fccd3e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1117,31 +1117,7 @@ public class BrokerService implements Closeable {
// The topic level policies are not needed now, but the
meaning of calling
// "getTopicPoliciesBypassSystemTopic" will wait for
system topic policies initialization.
getTopicPoliciesBypassSystemTopic(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY)
- .thenCompose(optionalTopicPolicies -> {
- if (topicName.isPartitioned()) {
- final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
- return
fetchPartitionedTopicMetadataAsync(topicNameEntity)
- .thenCompose((metadata) -> {
- // Allow creating non-partitioned
persistent topic that name includes
- // `partition`
- if (metadata.partitions == 0
- ||
topicName.getPartitionIndex() < metadata.partitions) {
- return
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-
loadOrCreatePersistentTopic(context));
- } else {
- final String errorMsg =
- String.format("Illegal
topic partition name %s with max allowed "
- + "%d partitions",
topicName, metadata.partitions);
- log.warn(errorMsg);
- return FutureUtil.failedFuture(
- new
BrokerServiceException.NotAllowedException(errorMsg));
- }
- });
- } else {
- return
topics.computeIfAbsent(topicName.toString(), (tpName) ->
- loadOrCreatePersistentTopic(context));
- }
- }).thenRun(() -> {
+ .thenRun(() -> {
final var inserted = new MutableBoolean(false);
final var cachedFuture =
topics.computeIfAbsent(topicName.toString(), ___ -> {
inserted.setTrue();
@@ -1678,9 +1654,32 @@ public class BrokerService implements Closeable {
* loading and puts them into queue once in-process topics are created.
*/
protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(TopicLoadingContext context) {
+ final var topicName = context.getTopicName();
final var topic = context.getTopicName().toString();
+ final CompletableFuture<Void> ownedFuture;
+ if (topicName.isPartitioned()) {
+ final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
+ ownedFuture = fetchPartitionedTopicMetadataAsync(topicNameEntity)
+ .thenCompose((metadata) -> {
+ // Allow creating non-partitioned persistent topic
that name includes
+ // `partition`
+ if (metadata.partitions == 0
+ || topicName.getPartitionIndex() <
metadata.partitions) {
+ return checkTopicNsOwnership(topic);
+ } else {
+ final String errorMsg =
+ String.format("Illegal topic partition
name %s with max allowed "
+ + "%d partitions", topicName,
metadata.partitions);
+ log.warn(errorMsg);
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.NotAllowedException(errorMsg));
+ }
+ });
+ } else {
+ ownedFuture = checkTopicNsOwnership(topic);
+ }
final var topicFuture = context.getTopicFuture();
- checkTopicNsOwnership(topic)
+ ownedFuture
.thenRun(() -> {
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();