mattisonchao commented on code in PR #19086:
URL: https://github.com/apache/pulsar/pull/19086#discussion_r1064360493
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java:
##########
@@ -2676,15 +2676,12 @@ public void testFailedUpdatePartitionedTopic() throws
Exception {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
startPartitions);
// create a subscription for few new partition which can fail
- admin.topics().createSubscription(partitionedTopicName + "-partition-"
+ startPartitions, subName1,
- MessageId.earliest);
-
try {
- admin.topics().updatePartitionedTopic(partitionedTopicName,
newPartitions, false, false);
- } catch (PulsarAdminException.PreconditionFailedException e) {
- // Ok
+ admin.topics().createSubscription(partitionedTopicName +
"-partition-" + startPartitions, subName1,
+ MessageId.earliest);
Review Comment:
fixed
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -411,67 +411,84 @@ protected CompletableFuture<Void>
internalCreateNonPartitionedTopicAsync(boolean
* recreate them at application so, newly created producers and consumers
can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until
all producers are restarted at application.
*
- * @param numPartitions
+ * @param expectPartitions
* @param updateLocalTopicOnly
* @param authoritative
* @param force
*/
- protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
numPartitions,
+ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
expectPartitions,
boolean updateLocalTopicOnly,
boolean authoritative, boolean force) {
- if (numPartitions <= 0) {
- return FutureUtil.failedFuture(new
RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be more than 0"));
+ if (expectPartitions <= 0) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be more than 0"));
}
return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION,
- PolicyOperation.WRITE))
+ .thenCompose(__ ->
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION, PolicyOperation.WRITE))
.thenCompose(__ -> {
if (!updateLocalTopicOnly && !force) {
- return
validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+ return
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
} else {
return CompletableFuture.completedFuture(null);
}
}).thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(topicMetadata -> {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- if (maxPartitions > 0 && numPartitions > maxPartitions) {
+ if (maxPartitions > 0 && expectPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal
to " + maxPartitions);
}
- // Only do the validation if it's the first hop.
- if (topicName.isGlobal() &&
isNamespaceReplicated(topicName.getNamespaceObject())) {
- return
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
- .thenApply(clusters -> {
- if
(!clusters.contains(pulsar().getConfig().getClusterName())) {
- log.error("[{}] local cluster is not part
of replicated cluster for namespace {}",
- clientAppId(), topicName);
- throw new RestException(Status.FORBIDDEN,
"Local cluster is not part of replicate"
- + " cluster list");
- }
- return clusters;
- })
- .thenCompose(clusters ->
-
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
- .thenApply(ignore -> clusters))
- .thenCompose(clusters ->
createSubscriptions(topicName, numPartitions, force).thenApply(
- ignore -> clusters))
- .thenCompose(clusters -> {
- if (!updateLocalTopicOnly) {
- return
updatePartitionInOtherCluster(numPartitions, clusters)
- .thenCompose(v ->
namespaceResources().getPartitionedTopicResources()
-
.updatePartitionedTopicAsync(topicName, p ->
- new
PartitionedTopicMetadata(numPartitions,
-
p.properties)
- ));
- } else {
- return
CompletableFuture.completedFuture(null);
- }
- });
- } else {
- return
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
- .thenCompose(ignore ->
updatePartitionedTopic(topicName, numPartitions, force));
+ final PulsarAdmin adminClient;
+ try {
+ adminClient = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ return
adminClient.topics().getListAsync(topicName.getNamespace())
+ .thenCompose(topics -> {
+ long existPartitions = topics.stream()
+ .filter(t ->
t.startsWith(topicName.getPartitionedTopicName()))
Review Comment:
fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]