Technoboy- commented on code in PR #19166: URL: https://github.com/apache/pulsar/pull/19166#discussion_r1073060471
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java: ########## @@ -408,89 +409,160 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean * already exist and number of new partitions must be greater than existing number of partitions. Decrementing * number of partitions requires deletion of topic which is not supported. * - * Already created partitioned producers and consumers can't see newly created partitions and it requires to - * recreate them at application so, newly created producers and consumers can connect to newly added partitions as - * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application. - * - * @param expectPartitions - * @param updateLocalTopicOnly - * @param authoritative - * @param force + * @exception RestException Unprocessable entity, status code: 422. throw it when some pre-check failed. + * @exception RestException Internal Server Error, status code: 500. throw it when get unknown Exception */ - protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int expectPartitions, - boolean updateLocalTopicOnly, - boolean authoritative, boolean force) { - if (expectPartitions <= 0) { - return FutureUtil.failedFuture( - new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0")); - } - return validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> - validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, PolicyOperation.WRITE)) - .thenCompose(__ -> { - if (!updateLocalTopicOnly && !force) { - return validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions); - } else { - return CompletableFuture.completedFuture(null); + protected @Nonnull CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int expectPartitions, + boolean updateLocal, + boolean force) { + PulsarService pulsarService = pulsar(); + return pulsarService.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(partitionedTopicMetadata -> { + int currentMetadataPartitions = partitionedTopicMetadata.partitions; + if (currentMetadataPartitions <= 0) { + throw new RestException(422 /* Unprocessable entity*/, + String.format("Topic %s is not the partitioned topic.", topicName)); } - }).thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)) - .thenCompose(topicMetadata -> { - final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - if (maxPartitions > 0 && expectPartitions > maxPartitions) { - throw new RestException(Status.NOT_ACCEPTABLE, - "Number of partitions should be less than or equal to " + maxPartitions); + if (expectPartitions < currentMetadataPartitions) { + throw new RestException(422 /* Unprocessable entity*/, + String.format("Expect partitions %s can't less than current partitions %s.", + expectPartitions, currentMetadataPartitions)); } - final PulsarAdmin adminClient; + int brokerMaximumPartitionsPerTopic = pulsarService.getConfiguration() + .getMaxNumPartitionsPerPartitionedTopic(); + if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > brokerMaximumPartitionsPerTopic) { Review Comment: `brokerMaximumPartitionsPerTopic != 0` -> `brokerMaximumPartitionsPerTopic > 0` Seems better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org