This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 12c75bdbb6f [fix][broker]fail to update partition meta of topic due to
ConflictException: subscription already exists for topic (#17488)
12c75bdbb6f is described below
commit 12c75bdbb6fc2b798cb43576856d5c2bcf9f3245
Author: Qiang Huang <[email protected]>
AuthorDate: Wed Sep 7 15:58:35 2022 +0800
[fix][broker]fail to update partition meta of topic due to
ConflictException: subscription already exists for topic (#17488)
---
.../broker/admin/impl/PersistentTopicsBase.java | 26 +++++++++++++++++-----
.../apache/pulsar/broker/admin/AdminApi2Test.java | 8 +++++++
2 files changed, 28 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e677746bfeb..085404e755a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -442,7 +442,8 @@ public class PersistentTopicsBase extends AdminResource {
}
try {
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC,
TimeUnit.SECONDS);
- createSubscriptions(topicName,
numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+ createSubscriptions(topicName, numPartitions,
force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
+ TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
@@ -4127,7 +4128,7 @@ public class PersistentTopicsBase extends AdminResource {
private CompletableFuture<Void> updatePartitionedTopic(TopicName
topicName, int numPartitions, boolean force) {
CompletableFuture<Void> result = new CompletableFuture<>();
- createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
+ createSubscriptions(topicName, numPartitions, force).thenCompose(__ ->
{
CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p -> new
PartitionedTopicMetadata(numPartitions));
future.exceptionally(ex -> {
@@ -4164,8 +4165,10 @@ public class PersistentTopicsBase extends AdminResource {
*
* @param topicName : topic-name: persistent://prop/cluster/ns/topic
* @param numPartitions : number partitions for the topics
+ * @param ignoreConflictException : If true, ignore ConflictException:
subscription already exists for topic
*/
- private CompletableFuture<Void> createSubscriptions(TopicName topicName,
int numPartitions) {
+ private CompletableFuture<Void> createSubscriptions(TopicName topicName,
int numPartitions,
+ boolean
ignoreConflictException) {
CompletableFuture<Void> result = new CompletableFuture<>();
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
-> {
if (partitionMetadata.partitions < 1) {
@@ -4200,9 +4203,20 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = partitionMetadata.partitions; i <
numPartitions; i++) {
final String topicNamePartition =
topicName.getPartition(i).toString();
-
-
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription, MessageId.latest));
+ CompletableFuture<Void> future = new
CompletableFuture<>();
+
admin.topics().createSubscriptionAsync(topicNamePartition,
+ subscription,
MessageId.latest).whenComplete((__, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else {
+ if (ignoreConflictException && ex instanceof
PulsarAdminException.ConflictException) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(ex);
+ }
+ }
+ });
+ subscriptionFutures.add(future);
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 62aed23ea4c..3cb67eada5e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2464,9 +2464,17 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
} catch (PulsarAdminException.PreconditionFailedException e) {
// Ok
}
+
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
startPartitions);
admin.topics().updatePartitionedTopic(partitionedTopicName,
newPartitions, false, true);
// validate subscription is created for new partition.
assertNotNull(admin.topics().getStats(partitionedTopicName +
"-partition-" + 6).getSubscriptions().get(subName1));
+ for (int i = startPartitions; i < newPartitions; i++) {
+ assertNotNull(
+ admin.topics().getStats(partitionedTopicName +
"-partition-" + i).getSubscriptions().get(subName1));
+ }
+
+ // validate update partition is success
+
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
newPartitions);
}
@Test(dataProvider = "topicType")