This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 12c75bdbb6f [fix][broker]fail to update partition meta of topic due to 
ConflictException: subscription already exists for topic (#17488)
12c75bdbb6f is described below

commit 12c75bdbb6fc2b798cb43576856d5c2bcf9f3245
Author: Qiang Huang <[email protected]>
AuthorDate: Wed Sep 7 15:58:35 2022 +0800

    [fix][broker]fail to update partition meta of topic due to 
ConflictException: subscription already exists for topic (#17488)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 26 +++++++++++++++++-----
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  8 +++++++
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e677746bfeb..085404e755a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -442,7 +442,8 @@ public class PersistentTopicsBase extends AdminResource {
             }
             try {
                 
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, 
TimeUnit.SECONDS);
-                createSubscriptions(topicName, 
numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+                createSubscriptions(topicName, numPartitions, 
force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
+                        TimeUnit.SECONDS);
             } catch (Exception e) {
                 if (e.getCause() instanceof RestException) {
                     throw (RestException) e.getCause();
@@ -4127,7 +4128,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
+        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> 
{
             CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
                     .updatePartitionedTopicAsync(topicName, p -> new 
PartitionedTopicMetadata(numPartitions));
             future.exceptionally(ex -> {
@@ -4164,8 +4165,10 @@ public class PersistentTopicsBase extends AdminResource {
      *
      * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
      * @param numPartitions : number partitions for the topics
+     * @param ignoreConflictException : If true, ignore ConflictException: 
subscription already exists for topic
      */
-    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int numPartitions) {
+    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int numPartitions,
+                                                        boolean 
ignoreConflictException) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
 -> {
             if (partitionMetadata.partitions < 1) {
@@ -4200,9 +4203,20 @@ public class PersistentTopicsBase extends AdminResource {
 
                     for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
                         final String topicNamePartition = 
topicName.getPartition(i).toString();
-
-                        
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
-                                subscription, MessageId.latest));
+                        CompletableFuture<Void> future = new 
CompletableFuture<>();
+                        
admin.topics().createSubscriptionAsync(topicNamePartition,
+                                subscription, 
MessageId.latest).whenComplete((__, ex) -> {
+                            if (ex == null) {
+                                future.complete(null);
+                            } else {
+                                if (ignoreConflictException && ex instanceof 
PulsarAdminException.ConflictException) {
+                                    future.complete(null);
+                                } else {
+                                    future.completeExceptionally(ex);
+                                }
+                            }
+                        });
+                        subscriptionFutures.add(future);
                     }
                 });
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 62aed23ea4c..3cb67eada5e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2464,9 +2464,17 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         } catch (PulsarAdminException.PreconditionFailedException e) {
             // Ok
         }
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 startPartitions);
         admin.topics().updatePartitionedTopic(partitionedTopicName, 
newPartitions, false, true);
         // validate subscription is created for new partition.
         assertNotNull(admin.topics().getStats(partitionedTopicName + 
"-partition-" + 6).getSubscriptions().get(subName1));
+        for (int i = startPartitions; i < newPartitions; i++) {
+            assertNotNull(
+                    admin.topics().getStats(partitionedTopicName + 
"-partition-" + i).getSubscriptions().get(subName1));
+        }
+
+        // validate update partition is success
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 newPartitions);
     }
 
     @Test(dataProvider = "topicType")

Reply via email to