mattisonchao commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800464850



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -415,88 +427,66 @@ protected void internalCreateNonPartitionedTopic(boolean 
authoritative, Map<Stri
      * well. Therefore, it can violate partition ordering at producers until 
all producers are restarted at application.
      *
      * @param numPartitions
+     * @param updateLocalTopicOnly
+     * @param authoritative
+     * @param force
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean 
updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int 
numPartitions,
+                                                                          
boolean updateLocalTopicOnly,
+                                                                          
boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0");
+            return FutureUtil.failedFuture(new 
RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), 
numPartitions);
-        }
-        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + 
maxPartitions);
-        }
-
-        if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = 
getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated 
cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is 
not part of replicate cluster list");
-            }
-            try {
-                
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, 
TimeUnit.SECONDS);
-                createSubscriptions(topicName, 
numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, 
PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                if (!updateLocalTopicOnly && !force) {
+                    return 
validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                }  else {
+                    return CompletableFuture.completedFuture(null);
                 }
-                log.error("[{}] Failed to update partitioned topic {}", 
clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-            // if this cluster is the first hop which needs to coordinate with 
other clusters then update partitions in
-            // other clusters and then update number of partitions.
-            if (!updateLocalTopicOnly) {
-                CompletableFuture<Void> updatePartition = new 
CompletableFuture<>();
-                updatePartitionInOtherCluster(numPartitions, 
clusters).thenRun(() -> {
-                    try {
-                        namespaceResources().getPartitionedTopicResources()
-                                .updatePartitionedTopicAsync(topicName, p ->
-                            new PartitionedTopicMetadata(numPartitions)
-                        ).thenAccept(r -> 
updatePartition.complete(null)).exceptionally(ex -> {
-                            
updatePartition.completeExceptionally(ex.getCause());
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        updatePartition.completeExceptionally(e);
-                    }
-                }).exceptionally(ex -> {
-                    updatePartition.completeExceptionally(ex);
-                    return null;
-                });
-                try {
-                    updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, 
TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("{} Failed to update number of partitions in zk 
for topic {} and partitions {}",
-                            clientAppId(), topicName, numPartitions, e);
-                    if (e.getCause() instanceof RestException) {
-                        throw (RestException) e.getCause();
-                    }
-                    throw new RestException(e);
+            })
+            .thenCompose(__ -> {
+                final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                    throw new RestException(Status.NOT_ACCEPTABLE,
+                            "Number of partitions should be less than or equal 
to " + maxPartitions);
                 }
-            }
-            return;
-        }
-
-        try {
-            
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, 
TimeUnit.SECONDS);
-            updatePartitionedTopic(topicName, numPartitions, 
force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
-                    TimeUnit.SECONDS);
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            log.error("[{}] Failed to update partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
+                CompletableFuture<Void> ret;

Review comment:
       it looks unused. : (




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to