gaoran10 commented on code in PR #22206:
URL: https://github.com/apache/pulsar/pull/22206#discussion_r1607978894
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java:
##########
@@ -308,14 +308,32 @@ static ClientBuilder builder() {
*
* <p>This can be used to discover the partitions and create {@link
Reader}, {@link Consumer} or {@link Producer}
* instances directly on a particular partition.
- *
+ * @Deprecated it is not suggested to use now; please use {@link
#getPartitionsForTopic(String, boolean)}.
* @param topic
* the topic name
* @return a future that will yield a list of the topic partitions or
{@link PulsarClientException} if there was any
* error in the operation.
+ *
* @since 2.3.0
*/
- CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+ @Deprecated
+ default CompletableFuture<List<String>> getPartitionsForTopic(String
topic) {
+ return getPartitionsForTopic(topic, true);
+ }
+
+ /**
+ * 1. Get the partitions if the topic exists. Return "[{partition-0},
{partition-1}....{partition-n}}]" if a
+ * partitioned topic exists; return "[{topic}]" if a non-partitioned
topic exists.
+ * 2. When {@param metadataAutoCreationEnabled} is "false", neither the
partitioned topic nor non-partitioned
+ * topic does not exist. You will get an {@link
PulsarClientException.NotFoundException}.
Review Comment:
Maybe this should be `PulsarClientException.TopicDoesNotExistException`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -607,35 +609,89 @@ protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
-
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
- .handle((metadata, ex) -> {
- if (ex == null) {
- int partitions = metadata.partitions;
-
commandSender.sendPartitionMetadataResponse(partitions, requestId);
- } else {
- if (ex instanceof PulsarClientException) {
- log.warn("Failed to authorize {} at
[{}] on topic {} : {}", getRole(),
- remoteAddress, topicName,
ex.getMessage());
-
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
- ex.getMessage(), requestId);
+ // Get if exists, respond not found error if not exists.
+
getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate
-> {
+ boolean autoCreateIfNotExist =
partitionMetadata.isMetadataAutoCreationEnabled();
+ if (!autoCreateIfNotExist) {
+ final NamespaceResources namespaceResources =
getBrokerService().pulsar()
+
.getPulsarResources().getNamespaceResources();
+ final TopicResources topicResources =
getBrokerService().pulsar().getPulsarResources()
+ .getTopicResources();
+ namespaceResources.getPartitionedTopicResources()
+ .getPartitionedTopicMetadataAsync(topicName,
false)
+ .thenAccept(metadata -> {
+ if (metadata.isPresent()) {
+
commandSender.sendPartitionMetadataResponse(metadata.get().partitions,
+ requestId);
+ lookupSemaphore.release();
+ return;
+ }
+ if (topicName.isPersistent()) {
+
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
+ if (exists) {
+
commandSender.sendPartitionMetadataResponse(0, requestId);
+ lookupSemaphore.release();
+ return;
+ }
+
writeAndFlush(Commands.newPartitionMetadataResponse(
+ ServerError.TopicNotFound,
"", requestId));
+ lookupSemaphore.release();
+ return;
+ }).exceptionally(ex -> {
+ log.error("{} {} Failed to get
partition metadata", topicName,
+ ServerCnx.this.toString(),
ex);
+ writeAndFlush(
+
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+ "Failed to check
partition metadata",
+ requestId));
+ lookupSemaphore.release();
+ return null;
+ });
+ }
Review Comment:
Do we need to release the `lookupSemaphore` when handling a non-persistent
topic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]