Technoboy- commented on code in PR #19166:
URL: https://github.com/apache/pulsar/pull/19166#discussion_r1073060471
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -408,89 +409,160 @@ protected CompletableFuture<Void>
internalCreateNonPartitionedTopicAsync(boolean
* already exist and number of new partitions must be greater than
existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
- * Already created partitioned producers and consumers can't see newly
created partitions and it requires to
- * recreate them at application so, newly created producers and consumers
can connect to newly added partitions as
- * well. Therefore, it can violate partition ordering at producers until
all producers are restarted at application.
- *
- * @param expectPartitions
- * @param updateLocalTopicOnly
- * @param authoritative
- * @param force
+ * @exception RestException Unprocessable entity, status code: 422. throw
it when some pre-check failed.
+ * @exception RestException Internal Server Error, status code: 500. throw
it when get unknown Exception
*/
- protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
expectPartitions,
-
boolean updateLocalTopicOnly,
-
boolean authoritative, boolean force) {
- if (expectPartitions <= 0) {
- return FutureUtil.failedFuture(
- new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be more than 0"));
- }
- return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ ->
- validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION, PolicyOperation.WRITE))
- .thenCompose(__ -> {
- if (!updateLocalTopicOnly && !force) {
- return
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
- } else {
- return CompletableFuture.completedFuture(null);
+ protected @Nonnull CompletableFuture<Void>
internalUpdatePartitionedTopicAsync(int expectPartitions,
+
boolean updateLocal,
+
boolean force) {
+ PulsarService pulsarService = pulsar();
+ return
pulsarService.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose(partitionedTopicMetadata -> {
+ int currentMetadataPartitions =
partitionedTopicMetadata.partitions;
+ if (currentMetadataPartitions <= 0) {
+ throw new RestException(422 /* Unprocessable entity*/,
+ String.format("Topic %s is not the partitioned
topic.", topicName));
}
- }).thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
- .thenCompose(topicMetadata -> {
- final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- if (maxPartitions > 0 && expectPartitions > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be less than or equal
to " + maxPartitions);
+ if (expectPartitions < currentMetadataPartitions) {
+ throw new RestException(422 /* Unprocessable entity*/,
+ String.format("Expect partitions %s can't less
than current partitions %s.",
+ expectPartitions,
currentMetadataPartitions));
}
- final PulsarAdmin adminClient;
+ int brokerMaximumPartitionsPerTopic =
pulsarService.getConfiguration()
+ .getMaxNumPartitionsPerPartitionedTopic();
+ if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions >
brokerMaximumPartitionsPerTopic) {
Review Comment:
`brokerMaximumPartitionsPerTopic != 0` -> `brokerMaximumPartitionsPerTopic >
0`
Seems better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]