This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d7c7d0db52 [improve][broker] Refactor update topic partitions 
endpoint. (#19166)
4d7c7d0db52 is described below

commit 4d7c7d0db5204f4ad52fd5858394cd9455f3837d
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jan 18 16:34:36 2023 +0800

    [improve][broker] Refactor update topic partitions endpoint. (#19166)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  15 -
 .../broker/admin/impl/PersistentTopicsBase.java    | 348 +++++++++------------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  38 +--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  35 +--
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   2 +-
 5 files changed, 186 insertions(+), 252 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1632831b634..f00717377ef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -165,21 +165,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return FutureUtil.waitForAll(futures);
     }
 
-    protected CompletableFuture<Void> tryCreateExtendedPartitionsAsync(int 
oldNumPartitions, int numPartitions) {
-        if (!topicName.isPersistent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        if (numPartitions <= oldNumPartitions) {
-            return CompletableFuture.failedFuture(new 
RestException(Status.NOT_ACCEPTABLE,
-                    "Number of new partitions must be greater than existing 
number of partitions"));
-        }
-        List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions 
- oldNumPartitions);
-        for (int i = oldNumPartitions; i < numPartitions; i++) {
-            futures.add(tryCreatePartitionAsync(i));
-        }
-        return FutureUtil.waitForAll(futures);
-    }
-
     private CompletableFuture<Void> tryCreatePartitionAsync(final int 
partition) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         
getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition))
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 50724142b0c..73f25914d7a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -23,6 +23,7 @@ import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInte
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.github.zafarkhaja.semver.Version;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import javax.annotation.Nonnull;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
@@ -129,7 +131,6 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -408,89 +409,160 @@ public class PersistentTopicsBase extends AdminResource {
      * already exist and number of new partitions must be greater than 
existing number of partitions. Decrementing
      * number of partitions requires deletion of topic which is not supported.
      *
-     * Already created partitioned producers and consumers can't see newly 
created partitions and it requires to
-     * recreate them at application so, newly created producers and consumers 
can connect to newly added partitions as
-     * well. Therefore, it can violate partition ordering at producers until 
all producers are restarted at application.
-     *
-     * @param expectPartitions
-     * @param updateLocalTopicOnly
-     * @param authoritative
-     * @param force
+     * @exception RestException Unprocessable entity, status code: 422. throw 
it when some pre-check failed.
+     * @exception RestException Internal Server Error, status code: 500. throw 
it when get unknown Exception
      */
-    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int 
expectPartitions,
-                                                                          
boolean updateLocalTopicOnly,
-                                                                          
boolean authoritative, boolean force) {
-        if (expectPartitions <= 0) {
-            return FutureUtil.failedFuture(
-                    new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0"));
-        }
-        return validateTopicOwnershipAsync(topicName, authoritative)
-            .thenCompose(__ ->
-                    validateTopicPolicyOperationAsync(topicName, 
PolicyName.PARTITION, PolicyOperation.WRITE))
-            .thenCompose(__ -> {
-                if (!updateLocalTopicOnly && !force) {
-                    return 
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
-                }  else {
-                    return CompletableFuture.completedFuture(null);
+    protected @Nonnull CompletableFuture<Void> 
internalUpdatePartitionedTopicAsync(int expectPartitions,
+                                                                               
    boolean updateLocal,
+                                                                               
    boolean force) {
+        PulsarService pulsarService = pulsar();
+        return 
pulsarService.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+            .thenCompose(partitionedTopicMetadata -> {
+                int currentMetadataPartitions = 
partitionedTopicMetadata.partitions;
+                if (currentMetadataPartitions <= 0) {
+                    throw new RestException(422 /* Unprocessable entity*/,
+                            String.format("Topic %s is not the partitioned 
topic.", topicName));
                 }
-            }).thenCompose(__ -> 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
-            .thenCompose(topicMetadata -> {
-                final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-                if (maxPartitions > 0 && expectPartitions > maxPartitions) {
-                    throw new RestException(Status.NOT_ACCEPTABLE,
-                            "Number of partitions should be less than or equal 
to " + maxPartitions);
+                if (expectPartitions < currentMetadataPartitions) {
+                    throw new RestException(422 /* Unprocessable entity*/,
+                            String.format("Expect partitions %s can't less 
than current partitions %s.",
+                                    expectPartitions, 
currentMetadataPartitions));
                 }
-                final PulsarAdmin adminClient;
+                int brokerMaximumPartitionsPerTopic = 
pulsarService.getConfiguration()
+                        .getMaxNumPartitionsPerPartitionedTopic();
+                if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > 
brokerMaximumPartitionsPerTopic) {
+                    throw new RestException(422 /* Unprocessable entity*/,
+                            String.format("Expect partitions %s grater than 
maximum partitions per topic %s",
+                                    expectPartitions, 
brokerMaximumPartitionsPerTopic));
+                }
+                final PulsarAdmin admin;
                 try {
-                    adminClient = pulsar().getAdminClient();
-                } catch (PulsarServerException e) {
-                    throw new RuntimeException(e);
+                    admin = pulsarService.getAdminClient();
+                } catch (PulsarServerException ex) {
+                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
Throwables.getRootCause(ex));
                 }
-                return 
adminClient.topics().getListAsync(topicName.getNamespace())
-                        .thenCompose(topics -> {
-                            long existPartitions = topics.stream()
-                                    .filter(t -> 
TopicName.get(t).getPartitionedTopicName()
-                                            
.equals(topicName.getPartitionedTopicName()))
-                                    .count();
-                            if (existPartitions >= expectPartitions) {
-                                throw new RestException(Status.CONFLICT,
-                                        "Number of new partitions must be 
greater than existing number of partitions");
-                            }
-                            // Only do the validation if it's the first hop.
-                            if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
-                                return 
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
-                                        .thenApply(clusters -> {
-                                            if 
(!clusters.contains(pulsar().getConfig().getClusterName())) {
-                                                log.error("[{}] local cluster 
is not part of"
-                                                                + " replicated 
cluster for namespace {}",
-                                                        clientAppId(), 
topicName);
-                                                throw new 
RestException(Status.FORBIDDEN,
-                                                        "Local cluster is not 
part of replicate cluster list");
-                                            }
-                                            return clusters;
-                                        })
-                                        .thenCompose(clusters ->
-                                                
tryCreatePartitionsAsync(expectPartitions)
-                                                        .thenApply(ignore -> 
clusters))
-                                        .thenCompose(clusters -> {
-                                            if (!updateLocalTopicOnly) {
-                                                return 
namespaceResources().getPartitionedTopicResources()
-                                                        
.updatePartitionedTopicAsync(topicName, p ->
-                                                                new 
PartitionedTopicMetadata(expectPartitions,
-                                                                        
p.properties))
-                                                        .thenCompose(__ ->
-                                                                
updatePartitionInOtherCluster(expectPartitions,
-                                                                        
clusters));
-                                            } else {
-                                                return 
CompletableFuture.completedFuture(null);
-                                            }
-                                        }).thenCompose(clusters -> 
createSubscriptions(topicName,
-                                                expectPartitions));
-                            } else {
-                                return 
tryCreatePartitionsAsync(expectPartitions)
-                                        .thenCompose(ignore -> 
updatePartitionedTopic(topicName, expectPartitions));
-                            }
-                        });
+                final CompletableFuture<Void> checkFuture;
+                if (!force) {
+                    checkFuture = 
admin.topics().getListAsync(topicName.getNamespace(), topicName.getDomain())
+                            .thenAccept(topics -> {
+                                final List<TopicName> existingPartitions = 
topics.stream()
+                                        .map(TopicName::get)
+                                        .filter(candidateTopicName -> 
candidateTopicName.getPartitionedTopicName()
+                                                
.equals(topicName.getPartitionedTopicName()))
+                                        .collect(Collectors.toList());
+                                    // Check whether exist unexpected partition
+                                    Optional<Integer> maximumPartitionIndex = 
existingPartitions.stream()
+                                            .map(TopicName::getPartitionIndex)
+                                            .max(Integer::compareTo);
+                                    if (maximumPartitionIndex.isPresent()
+                                            && maximumPartitionIndex.get() >= 
currentMetadataPartitions) {
+                                        List<String> unexpectedPartitions = 
existingPartitions.stream()
+                                                .filter(candidateTopicName ->
+                                                        candidateTopicName
+                                                                
.getPartitionIndex() > currentMetadataPartitions)
+                                                .map(TopicName::toString)
+                                                .collect(Collectors.toList());
+                                        throw new 
RestException(Status.CONFLICT,
+                                                String.format(
+                                                        "Exist unexpected 
topic partition(partition index grater than"
+                                                                + " current 
metadata maximum index %s) %s ",
+                                                        
currentMetadataPartitions,
+                                                        
StringUtils.join(unexpectedPartitions, ",")));
+                                    }
+                            });
+                } else {
+                    checkFuture = CompletableFuture.completedFuture(null);
+                }
+                return checkFuture.thenCompose(topics -> {
+                    final CompletableFuture<Void> updateMetadataFuture = 
(expectPartitions == currentMetadataPartitions)
+                            // current metadata partitions is equals to expect 
partitions
+                            ? CompletableFuture.completedFuture(null)
+                            // update current cluster topic metadata
+                            : 
namespaceResources().getPartitionedTopicResources()
+                            .updatePartitionedTopicAsync(topicName, m ->
+                                    new 
PartitionedTopicMetadata(expectPartitions, m.properties));
+                return updateMetadataFuture
+                    // create missing partitions
+                    .thenCompose(__ -> 
tryCreatePartitionsAsync(expectPartitions))
+                    // because we should consider the compatibility.
+                    // Copy subscriptions from partition 0 instead of being 
created by the customer
+                    .thenCompose(__ ->
+                            
admin.topics().getStatsAsync(topicName.getPartition(0).toString())
+                                    .thenCompose(stats -> {
+                                        List<CompletableFuture<Void>> futures 
= stats.getSubscriptions().entrySet()
+                                                // We must not re-create 
non-durable subscriptions on the new partitions
+                                                .stream().filter(entry -> 
entry.getValue().isDurable())
+                                                .map(entry -> {
+                                                    final 
List<CompletableFuture<Void>> innerFutures =
+                                                            new 
ArrayList<>(expectPartitions);
+                                                    for (int i = 0; i < 
expectPartitions; i++) {
+                                                        
innerFutures.add(admin.topics().createSubscriptionAsync(
+                                                                        
topicName.getPartition(i).toString(),
+                                                                        
entry.getKey(), MessageId.earliest,
+                                                                        
entry.getValue().isReplicated(),
+                                                                        
entry.getValue().getSubscriptionProperties())
+                                                                
.exceptionally(ex -> {
+                                                                    Throwable 
rc =
+                                                                            
FutureUtil.unwrapCompletionException(ex);
+                                                                    if (!(rc 
instanceof PulsarAdminException
+                                                                            
.ConflictException)) {
+                                                                        
log.warn("[{}] got an error while copying"
+                                                                               
         + " the subscription to the"
+                                                                               
         + " partition {}.", topicName,
+                                                                               
 Throwables.getRootCause(rc));
+                                                                        throw 
FutureUtil.wrapToCompletionException(rc);
+                                                                    }
+                                                                    // Ignore 
subscription already exist exception
+                                                                    return 
null;
+                                                                }));
+                                                    }
+                                                    return 
FutureUtil.waitForAll(innerFutures);
+                                                
}).collect(Collectors.toList());
+                                        return FutureUtil.waitForAll(futures);
+                                    })
+                    );
+                }).thenCompose(__ -> {
+                    if (updateLocal || !topicName.isGlobal()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    // update remote cluster
+                    return namespaceResources().getPoliciesAsync(namespaceName)
+                            .thenCompose(policies -> {
+                                if (!policies.isPresent()) {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                                final Set<String> replicationClusters = 
policies.get().replication_clusters;
+                                if (replicationClusters.size() == 0) {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                                boolean containsCurrentCluster =
+                                        
replicationClusters.contains(pulsar().getConfig().getClusterName());
+                                if (!containsCurrentCluster) {
+                                    log.error("[{}] local cluster is not part 
of replicated cluster for namespace {}",
+                                            clientAppId(), topicName);
+                                    throw new RestException(422,
+                                            "Local cluster is not part of 
replicate cluster list");
+                                }
+                                if (replicationClusters.size() == 1) {
+                                    // The replication clusters just has the 
current cluster itself.
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                                List<CompletableFuture<Void>> futures = 
replicationClusters.stream()
+                                        .map(replicationCluster -> 
admin.clusters().getClusterAsync(replicationCluster)
+                                                .thenCompose(clusterData -> 
pulsarService.getBrokerService()
+                                                    
.getClusterPulsarAdmin(replicationCluster, Optional.of(clusterData))
+                                                        
.topics().updatePartitionedTopicAsync(topicName.toString(),
+                                                            expectPartitions, 
true, force)
+                                                        .exceptionally(ex -> {
+                                                            log.warn("[{}][{}] 
Update remote cluster partition fail.",
+                                                                        
topicName, replicationCluster, ex);
+                                                            throw 
FutureUtil.wrapToCompletionException(ex);
+                                                        })
+                                                )
+                                        ).collect(Collectors.toList());
+                                return FutureUtil.waitForAll(futures);
+                            });
+                });
             });
     }
 
@@ -529,24 +601,6 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
-    private CompletableFuture<Void> updatePartitionInOtherCluster(int 
numPartitions, Set<String> clusters) {
-        List<CompletableFuture<Void>> results = new 
ArrayList<>(clusters.size() - 1);
-        clusters.forEach(cluster -> {
-            if (cluster.equals(pulsar().getConfig().getClusterName())) {
-                return;
-            }
-            CompletableFuture<Void> updatePartitionTopicFuture =
-                
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
-                    .thenApply(clusterDataOp ->
-                            
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp))
-                    .thenCompose(pulsarAdmin ->
-                            pulsarAdmin.topics().updatePartitionedTopicAsync(
-                                    topicName.toString(), numPartitions, true, 
false));
-            results.add(updatePartitionTopicFuture);
-        });
-        return FutureUtil.waitForAll(results);
-    }
-
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean 
authoritative,
                                                                       boolean 
checkAllowAutoCreation) {
         return sync(() -> internalGetPartitionedMetadataAsync(authoritative, 
checkAllowAutoCreation));
@@ -4389,106 +4443,6 @@ public class PersistentTopicsBase extends AdminResource 
{
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int expectPartitions) {
-        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(expectPartitions, 
p.properties));
-        future.exceptionally(ex -> {
-            // If the update operation fails, clean up the partitions that 
were created
-            getPartitionedTopicMetadataAsync(topicName, false, false)
-                    .thenAccept(metadata -> {
-                int oldPartition = metadata.partitions;
-                for (int i = oldPartition; i < expectPartitions; i++) {
-                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                        log.warn("[{}] Failed to clean up managedLedger {}", 
clientAppId(), topicName,
-                                ex1.getCause());
-                        return null;
-                    });
-                }
-            }).exceptionally(e -> {
-                log.warn("[{}] Failed to clean up managedLedger", topicName, 
e);
-                return null;
-            });
-            return null;
-        });
-        return future.thenCompose(__ -> createSubscriptions(topicName, 
expectPartitions));
-    }
-
-    /**
-     * It creates subscriptions for new partitions of existing 
partitioned-topics.
-     *
-     * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
-     * @param expectPartitions : number of expected partitions
-     *
-     */
-    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int expectPartitions) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        if (expectPartitions < 1) {
-            return FutureUtil.failedFuture(new RestException(Status.CONFLICT, 
"Topic is not partitioned topic"));
-        }
-        PulsarAdmin admin;
-        try {
-            admin = pulsar().getAdminClient();
-        } catch (PulsarServerException e1) {
-            return FutureUtil.failedFuture(e1);
-        }
-
-        
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
-            List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
-
-            stats.getSubscriptions().entrySet().forEach(e -> {
-                String subscription = e.getKey();
-                SubscriptionStats ss = e.getValue();
-                if (!ss.isDurable()) {
-                    // We must not re-create non-durable subscriptions on the 
new partitions
-                    return;
-                }
-                boolean replicated = ss.isReplicated();
-
-                for (int i = 0; i < expectPartitions; i++) {
-                    final String topicNamePartition = 
topicName.getPartition(i).toString();
-                    CompletableFuture<Void> future = new CompletableFuture<>();
-                    admin.topics().createSubscriptionAsync(topicNamePartition,
-                                    subscription, MessageId.earliest, 
replicated, ss.getSubscriptionProperties())
-                            .whenComplete((__, ex) -> {
-                        if (ex == null) {
-                            future.complete(null);
-                        } else {
-                            Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
-                            if (realCause instanceof 
PulsarAdminException.ConflictException) {
-                                future.complete(null);
-                            } else {
-                                future.completeExceptionally(realCause);
-                            }
-                        }
-                    });
-                    subscriptionFutures.add(future);
-                }
-            });
-
-            FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
-                log.info("[{}] Successfully created subscriptions on new 
partitions {}", clientAppId(), topicName);
-                result.complete(null);
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to create subscriptions on new 
partitions for {}",
-                        clientAppId(), topicName, ex);
-                result.completeExceptionally(ex);
-                return null;
-            });
-        }).exceptionally(ex -> {
-            if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException) {
-                // The first partition doesn't exist, so there are currently 
to subscriptions to recreate
-                result.complete(null);
-            } else {
-                log.warn("[{}] Failed to get list of subscriptions of {}",
-                        clientAppId(), topicName.getPartition(0), ex);
-                result.completeExceptionally(ex);
-            }
-            return null;
-        });
-        return result;
-    }
-
     // as described at : (PR: #836) CPP-client old client lib should not be 
allowed to connect on partitioned-topic.
     // So, all requests from old-cpp-client (< v1.21) must be rejected.
     // Pulsar client-java lib always passes user-agent as X-Java-$version.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fe9ced198f4..16f1b357dcd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -47,6 +47,8 @@ import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
@@ -257,43 +259,43 @@ public class PersistentTopics extends 
PersistentTopicsBase {
      * It updates number of partitions of an existing non-global partitioned 
topic. It requires partitioned-topic to be
      * already exist and number of new partitions must be greater than 
existing number of partitions. Decrementing
      * number of partitions requires deletion of topic which is not supported.
-     *
-     * Already created partitioned producers and consumers can't see newly 
created partitions and it requires to
-     * recreate them at application so, newly created producers and consumers 
can connect to newly added partitions as
-     * well. Therefore, it can violate partition ordering at producers until 
all producers are restarted at application.
-     *
-     * @param property
-     * @param cluster
-     * @param namespace
-     * @param numPartitions
      */
     @POST
     @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
     @ApiOperation(hidden = true, value = "Increment partitions of an existing 
partitioned topic.",
             notes = "It only increments partitions of existing non-global 
partitioned-topic")
     @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Update topic partition 
successful."),
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
-            @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Namespace or topic does not 
exist"),
-            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0"
-                    + " and less than or equal to 
maxNumPartitionsPerPartitionedTopic"
+            @ApiResponse(code = 401, message = "Unauthenticated"),
+            @ApiResponse(code = 403, message = "Forbidden/Unauthorized"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 422, message = "The number of partitions 
should be more than 0 and"
+                    + " less than or equal to 
maxNumPartitionsPerPartitionedTopic"
                     + " and number of new partitions must be greater than 
existing number of partitions"),
-            @ApiResponse(code = 409, message = "Partitioned topic does not 
exist")})
+            @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     public void updatePartitionedTopic(
             @Suspended final AsyncResponse asyncResponse,
             @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
-            @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean 
updateLocalTopicOnly,
+            @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean 
updateLocalTopic,
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopicAsync(numPartitions, 
updateLocalTopicOnly, authoritative, force)
-                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> 
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopic, force))
+                .thenAccept(__ -> {
+                    log.info("[{}][{}] Updated topic partition to {}.", 
clientAppId(), topicName, numPartitions);
+                    asyncResponse.resume(Response.noContent().build());
+                })
                 .exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
-                        log.error("[{}] Failed to update partitioned topic 
{}", clientAppId(), topicName, ex);
+                        log.error("[{}][{}] Failed to update partition to {}",
+                                clientAppId(), topicName, numPartitions, ex);
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 796bd2bb15b..b25e4d17353 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -778,30 +778,20 @@ public class PersistentTopics extends 
PersistentTopicsBase {
      * It updates number of partitions of an existing non-global partitioned 
topic. It requires partitioned-topic to be
      * already exist and number of new partitions must be greater than 
existing number of partitions. Decrementing
      * number of partitions requires deletion of topic which is not supported.
-     *
-     * Already created partitioned producers and consumers can't see newly 
created partitions and it requires to
-     * recreate them at application so, newly created producers and consumers 
can connect to newly added partitions as
-     * well. Therefore, it can violate partition ordering at producers until 
all producers are restarted at application.
-     *
-     * @param tenant
-     * @param namespace
-     * @param encodedTopic
-     * @param numPartitions
      */
     @POST
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Increment partitions of an existing partitioned 
topic.",
             notes = "It only increments partitions of existing non-global 
partitioned-topic")
     @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Update topic partition 
successful."),
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
-            @ApiResponse(code = 401,
-                    message = "Don't have permission to administrate resources 
on this tenant"),
-            @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Tenant does not exist"),
-            @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and"
+            @ApiResponse(code = 401, message = "Unauthenticated"),
+            @ApiResponse(code = 403, message = "Forbidden/Unauthorized"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 422, message = "The number of partitions 
should be more than 0 and"
                     + " less than or equal to 
maxNumPartitionsPerPartitionedTopic"
                     + " and number of new partitions must be greater than 
existing number of partitions"),
-            @ApiResponse(code = 409, message = "Partitioned topic does not 
exist"),
             @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
             @ApiResponse(code = 500, message = "Internal server error")
     })
@@ -813,7 +803,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean 
updateLocalTopicOnly,
+            @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean 
updateLocalTopic,
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
@@ -821,13 +811,16 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                     required = true, type = "int", defaultValue = "0")
                     int numPartitions) {
         validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        validatePartitionedTopicMetadataAsync()
-                .thenCompose(__ -> 
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly,
-                        authoritative, force))
-                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+        validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> 
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopic, force))
+                .thenAccept(__ -> {
+                    log.info("[{}][{}] Updated topic partition to {}.", 
clientAppId(), topicName, numPartitions);
+                    asyncResponse.resume(Response.noContent().build());
+                })
                 .exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
-                        log.error("[{}] Failed to update partitioned topic 
{}", clientAppId(), topicName, ex);
+                        log.error("[{}][{}] Failed to update partition to {}",
+                                clientAppId(), topicName, numPartitions, ex);
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 875bf629d4f..6b68b6b2bf6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1637,7 +1637,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
                 true, 3);
         verify(response, 
timeout(5000).times(1)).resume(throwableCaptor.capture());
         Assert.assertEquals(throwableCaptor.getValue().getMessage(),
-                "Number of new partitions must be greater than existing number 
of partitions");
+                "Expect partitions 3 can't less than current partitions 4.");
 
         response = mock(AsyncResponse.class);
         metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);


Reply via email to