codelipenghui commented on code in PR #21545:
URL: https://github.com/apache/pulsar/pull/21545#discussion_r1386592180
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1075,10 +1075,23 @@ public CompletableFuture<Optional<Topic>>
getTopic(final TopicName topicName, bo
return
loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
- return
CompletableFuture.completedFuture(Optional.empty());
+ final String info =
+ String.format("Creating a
topic encountered an illegal partition name."
+ + "
topic_name=%s metadata_partition_number=%s",
+ topicName,
metadata.partitions);
+ log.warn(info);
+ return CompletableFuture
+ .failedFuture(new
BrokerServiceException.NamingException(info));
});
}
return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
Review Comment:
Sorry, the above solution will not work since the future is created in the
map.computeIfAbsent.
```diff
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -90,6 +90,7 @@ import
org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -1064,34 +1065,31 @@ public class BrokerService implements Closeable {
throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies =
optionalTopicPolicies.orElse(null);
- return topics.computeIfAbsent(topicName.toString(),
(tpName) -> {
- if (topicName.isPartitioned()) {
- final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
- return
fetchPartitionedTopicMetadataAsync(topicNameEntity)
- .thenCompose((metadata) -> {
- // Allow crate non-partitioned
persistent topic that name includes `partition`
- if (metadata.partitions == 0
- ||
topicName.getPartitionIndex() < metadata.partitions) {
- return
loadOrCreatePersistentTopic(tpName, createIfMissing,
- properties,
topicPolicies);
- }
- final String info =
- String.format("Creating a
topic encountered an illegal partition name."
- + "
topic_name=%s metadata_partition_number=%s",
- topicName,
metadata.partitions);
- log.warn(info);
- return CompletableFuture
- .failedFuture(new
BrokerServiceException.NamingException(info));
- });
- }
- return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
- }).thenCompose(optionalTopic -> {
- if (!optionalTopic.isPresent() && createIfMissing) {
- log.warn("Different topic automatic creation
strategies lead to race conditions. "
- + "Try again to try to recover.
topic_name={}", topicName);
- return getTopic(topicName, true, properties);
+ CompletableFuture<Boolean> isValidTopic =
CompletableFuture.completedFuture(true);
+ if (topicName.isPartitioned()) {
+ final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
+ isValidTopic =
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+ .thenApply((metadata) -> {
+ // Allow crate non-partitioned
persistent topic that name includes `partition`
+ return metadata.partitions == 0
+ ||
topicName.getPartitionIndex() < metadata.partitions;
+ });
+ }
+ return isValidTopic.thenCompose(isValid -> {
+ if (isValid) {
+ return
topics.computeIfAbsent(topicName.toString(), (tpName) -> {
+ return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent() &&
createIfMissing) {
+ log.warn("Different topic automatic
creation strategies lead to race conditions. "
+ + "Try again to try to recover.
topic_name={}", topicName);
+ return getTopic(topicName, true,
properties);
+ }
+ return
CompletableFuture.completedFuture(optionalTopic);
+ });
+ } else {
+ return
CompletableFuture.completedFuture(Optional.empty());
}
- return
CompletableFuture.completedFuture(optionalTopic);
});
});
} else {
```
It looks like we don't need to check if the topic name is a valid topic name
in the map.computeIfAbsent? So I tried the above changes. @mattisonchao
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]