codelipenghui commented on a change in pull request #11683:
URL: https://github.com/apache/pulsar/pull/11683#discussion_r691794699



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3636,6 +3636,21 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
                         p -> new PartitionedTopicMetadata(numPartitions));
                 updatePartition.complete(null);
             } catch (Exception e) {
+                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
+                    int oldPartition = metadata.partitions;
+                    for (int i = oldPartition; i < numPartitions; i++) {
+                        String managedLedgerPath = 
ZkAdminPaths.managedLedgerPath(topicName.getPartition(i));
+                        namespaceResources().getPartitionedTopicResources()
+                                
.deleteAsync(managedLedgerPath).exceptionally(ex1 -> {
+                            log.error("[{}] Failed to delete managedLedger 
znode {}", clientAppId(),
+                                    managedLedgerPath, ex1.getCause());
+                            return null;
+                        });
+                    }
+                }).exceptionally(ex -> {
+                    updatePartition.completeExceptionally(e);

Review comment:
       We don't need to complete the future again here since line:3654 already 
completes the future.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3636,6 +3636,21 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
                         p -> new PartitionedTopicMetadata(numPartitions));
                 updatePartition.complete(null);
             } catch (Exception e) {
+                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
+                    int oldPartition = metadata.partitions;
+                    for (int i = oldPartition; i < numPartitions; i++) {
+                        String managedLedgerPath = 
ZkAdminPaths.managedLedgerPath(topicName.getPartition(i));
+                        namespaceResources().getPartitionedTopicResources()
+                                
.deleteAsync(managedLedgerPath).exceptionally(ex1 -> {
+                            log.error("[{}] Failed to delete managedLedger 
znode {}", clientAppId(),

Review comment:
       We should use warn LEVEL here since we are trying to delete the 
partitions not guaranteed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to