codelipenghui commented on code in PR #21545:
URL: https://github.com/apache/pulsar/pull/21545#discussion_r1386592180


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1075,10 +1075,23 @@ public CompletableFuture<Optional<Topic>> 
getTopic(final TopicName topicName, bo
                                             return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
                                                     properties, topicPolicies);
                                         }
-                                        return 
CompletableFuture.completedFuture(Optional.empty());
+                                        final String info =
+                                                String.format("Creating a 
topic encountered an illegal partition name."
+                                                                + " 
topic_name=%s metadata_partition_number=%s",
+                                                        topicName, 
metadata.partitions);
+                                        log.warn(info);
+                                        return CompletableFuture
+                                                .failedFuture(new 
BrokerServiceException.NamingException(info));
                                     });
                         }
                         return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
+                    }).thenCompose(optionalTopic -> {

Review Comment:
   Sorry, the above solution will not work since the future is created in the 
map.computeIfAbsent.
   
   
   ```diff
   --- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
   +++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
   @@ -90,6 +90,7 @@ import 
org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
    import org.apache.bookkeeper.mledger.util.Futures;
    import org.apache.commons.collections4.MapUtils;
    import org.apache.commons.lang3.StringUtils;
   +import org.apache.commons.lang3.tuple.Pair;
    import 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
    import org.apache.pulsar.broker.PulsarServerException;
    import org.apache.pulsar.broker.PulsarService;
   @@ -1064,34 +1065,31 @@ public class BrokerService implements Closeable {
                        throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
                    }).thenCompose(optionalTopicPolicies -> {
                        final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
   -                    return topics.computeIfAbsent(topicName.toString(), 
(tpName) -> {
   -                        if (topicName.isPartitioned()) {
   -                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
   -                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
   -                                    .thenCompose((metadata) -> {
   -                                        // Allow crate non-partitioned 
persistent topic that name includes `partition`
   -                                        if (metadata.partitions == 0
   -                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
   -                                            return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
   -                                                    properties, 
topicPolicies);
   -                                        }
   -                                        final String info =
   -                                                String.format("Creating a 
topic encountered an illegal partition name."
   -                                                                + " 
topic_name=%s metadata_partition_number=%s",
   -                                                        topicName, 
metadata.partitions);
   -                                        log.warn(info);
   -                                        return CompletableFuture
   -                                                .failedFuture(new 
BrokerServiceException.NamingException(info));
   -                                    });
   -                        }
   -                        return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
   -                    }).thenCompose(optionalTopic -> {
   -                        if (!optionalTopic.isPresent() && createIfMissing) {
   -                            log.warn("Different topic automatic creation 
strategies lead to race conditions. "
   -                                    + "Try again to try to recover. 
topic_name={}", topicName);
   -                            return getTopic(topicName, true, properties);
   +                    CompletableFuture<Boolean> isValidTopic = 
CompletableFuture.completedFuture(true);
   +                    if (topicName.isPartitioned()) {
   +                        final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
   +                        isValidTopic = 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
   +                                .thenApply((metadata) -> {
   +                                    // Allow crate non-partitioned 
persistent topic that name includes `partition`
   +                                    return metadata.partitions == 0
   +                                            || 
topicName.getPartitionIndex() < metadata.partitions;
   +                                });
   +                    }
   +                    return isValidTopic.thenCompose(isValid -> {
   +                        if (isValid) {
   +                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) -> {
   +                                return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
   +                            }).thenCompose(optionalTopic -> {
   +                                if (!optionalTopic.isPresent() && 
createIfMissing) {
   +                                    log.warn("Different topic automatic 
creation strategies lead to race conditions. "
   +                                            + "Try again to try to recover. 
topic_name={}", topicName);
   +                                    return getTopic(topicName, true, 
properties);
   +                                }
   +                                return 
CompletableFuture.completedFuture(optionalTopic);
   +                            });
   +                        } else {
   +                            return 
CompletableFuture.completedFuture(Optional.empty());
                            }
   -                        return 
CompletableFuture.completedFuture(optionalTopic);
                        });
                    });
                } else {
   ```
   
   It looks like we don't need to check if the topic name is a valid topic name 
in the map.computeIfAbsent? So I tried the above changes. @mattisonchao 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to