mattisonchao commented on code in PR #19086:
URL: https://github.com/apache/pulsar/pull/19086#discussion_r1064355690


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4363,124 +4380,100 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> 
{
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());
-                            return null;
-                        });
-                    }
-                }).exceptionally(e -> {
-                    log.warn("[{}] Failed to clean up managedLedger", 
topicName, e);
-                    return null;
-                });
+    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int expectPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(expectPartitions, 
p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, false)
+                    .thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < expectPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
+                        log.warn("[{}] Failed to clean up managedLedger {}", 
clientAppId(), topicName,
+                                ex1.getCause());
+                        return null;
+                    });
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to clean up managedLedger", topicName, 
e);
                 return null;
             });
-            return future;
-        }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
-            result.completeExceptionally(ex);
             return null;
         });
-        return result;
+        return future.thenCompose(__ -> createSubscriptions(topicName, 
expectPartitions));
     }
 
     /**
      * It creates subscriptions for new partitions of existing 
partitioned-topics.
      *
      * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
-     * @param numPartitions : number partitions for the topics
-     * @param ignoreConflictException : If true, ignore ConflictException: 
subscription already exists for topic
+     * @param expectPartitions : number of expected partitions
      *
      */
-    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int numPartitions,
-                                                       boolean 
ignoreConflictException) {
+    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int expectPartitions) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
 -> {
-            if (partitionMetadata.partitions < 1) {
-                result.completeExceptionally(new 
RestException(Status.CONFLICT, "Topic is not partitioned topic"));
-                return;
-            }
-
-            if (partitionMetadata.partitions >= numPartitions) {
-                result.completeExceptionally(new RestException(Status.CONFLICT,
-                        "number of partitions must be more than existing " + 
partitionMetadata.partitions));
-                return;
-            }
-
-            PulsarAdmin admin;
-            try {
-                admin = pulsar().getAdminClient();
-            } catch (PulsarServerException e1) {
-                result.completeExceptionally(e1);
-                return;
-            }
+        if (expectPartitions < 1) {
+            return FutureUtil.failedFuture(new RestException(Status.CONFLICT, 
"Topic is not partitioned topic"));
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e1) {
+            return FutureUtil.failedFuture(e1);
+        }
 
-            
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
-                List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
+        
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
+            List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
 
-                stats.getSubscriptions().entrySet().forEach(e -> {
-                    String subscription = e.getKey();
-                    SubscriptionStats ss = e.getValue();
-                    if (!ss.isDurable()) {
-                        // We must not re-create non-durable subscriptions on 
the new partitions
-                        return;
-                    }
-                    boolean replicated = ss.isReplicated();
-
-                    for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
-                        final String topicNamePartition = 
topicName.getPartition(i).toString();
-                        CompletableFuture<Void> future = new 
CompletableFuture<>();
-                        
admin.topics().createSubscriptionAsync(topicNamePartition,
-                                        subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {
-                            if (ex == null) {
+            stats.getSubscriptions().entrySet().forEach(e -> {
+                String subscription = e.getKey();
+                SubscriptionStats ss = e.getValue();
+                if (!ss.isDurable()) {
+                    // We must not re-create non-durable subscriptions on the 
new partitions
+                    return;
+                }
+                boolean replicated = ss.isReplicated();
+
+                for (int i = 0; i < expectPartitions; i++) {
+                    final String topicNamePartition = 
topicName.getPartition(i).toString();
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    admin.topics().createSubscriptionAsync(topicNamePartition,
+                                    subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {

Review Comment:
   I think I can fix it in the next refactor PR.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4363,124 +4380,100 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> 
{
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());
-                            return null;
-                        });
-                    }
-                }).exceptionally(e -> {
-                    log.warn("[{}] Failed to clean up managedLedger", 
topicName, e);
-                    return null;
-                });
+    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int expectPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(expectPartitions, 
p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, false)
+                    .thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < expectPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {

Review Comment:
   I think I can fix it in the next refactor PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to