This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 16a39e6abf2 [improve][broker]Only create extended partitions when
updating partition number (#17349)
16a39e6abf2 is described below
commit 16a39e6abf2c8f61a2e50d3201183122afa40801
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Sep 13 10:25:34 2022 +0800
[improve][broker]Only create extended partitions when updating partition
number (#17349)
---
.../org/apache/pulsar/broker/admin/AdminResource.java | 15 +++++++++++++++
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 11 ++++++-----
.../apache/pulsar/broker/admin/v1/PersistentTopics.java | 3 ++-
.../apache/pulsar/broker/admin/v2/PersistentTopics.java | 3 ++-
.../apache/pulsar/broker/admin/PersistentTopicsTest.java | 16 ++++++++++++++++
5 files changed, 41 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3a779845168..1b05df826c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -162,6 +162,21 @@ public abstract class AdminResource extends
PulsarWebResource {
return FutureUtil.waitForAll(futures);
}
+ protected CompletableFuture<Void> tryCreateExtendedPartitionsAsync(int
oldNumPartitions, int numPartitions) {
+ if (!topicName.isPersistent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (numPartitions <= oldNumPartitions) {
+ return CompletableFuture.failedFuture(new
RestException(Status.NOT_ACCEPTABLE,
+ "Number of new partitions must be greater than existing
number of partitions"));
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions
- oldNumPartitions);
+ for (int i = oldNumPartitions; i < numPartitions; i++) {
+ futures.add(tryCreatePartitionAsync(i));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+
private CompletableFuture<Void> tryCreatePartitionAsync(final int
partition) {
CompletableFuture<Void> result = new CompletableFuture<>();
getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition))
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 274a2adf3a4..b477a3dc1d6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -464,8 +464,8 @@ public class PersistentTopicsBase extends AdminResource {
} else {
return CompletableFuture.completedFuture(null);
}
- })
- .thenCompose(__ -> {
+ }).thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
+ .thenCompose(topicMetadata -> {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
@@ -483,8 +483,9 @@ public class PersistentTopicsBase extends AdminResource {
}
return clusters;
})
- .thenCompose(clusters ->
tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
- clusters))
+ .thenCompose(clusters ->
+
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
+ .thenApply(ignore -> clusters))
.thenCompose(clusters ->
createSubscriptions(topicName, numPartitions, force).thenApply(
ignore -> clusters))
.thenCompose(clusters -> {
@@ -500,7 +501,7 @@ public class PersistentTopicsBase extends AdminResource {
}
});
} else {
- return tryCreatePartitionsAsync(numPartitions)
+ return
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
.thenCompose(ignore ->
updatePartitionedTopic(topicName, numPartitions, force));
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index df32386152a..8e3f3adbfee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -273,7 +273,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions
should be more than 0"
- + " and less than or equal to
maxNumPartitionsPerPartitionedTopic"),
+ + " and less than or equal to
maxNumPartitionsPerPartitionedTopic"
+ + " and number of new partitions must be greater than
existing number of partitions"),
@ApiResponse(code = 409, message = "Partitioned topic does not
exist")})
public void updatePartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f283bb7aa5c..742f27161f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -832,7 +832,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and"
- + " less than or equal to
maxNumPartitionsPerPartitionedTopic"),
+ + " less than or equal to
maxNumPartitionsPerPartitionedTopic"
+ + " and number of new partitions must be greater than
existing number of partitions"),
@ApiResponse(code = 409, message = "Partitioned topic does not
exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is
invalid"),
@ApiResponse(code = 500, message = "Internal server error")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 29d81d609b4..e4097b68e88 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1600,5 +1600,21 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+ // check number of new partitions must be greater than existing number
of partitions
+ response = mock(AsyncResponse.class);
+ ArgumentCaptor<Throwable> throwableCaptor =
ArgumentCaptor.forClass(Throwable.class);
+ persistentTopics.updatePartitionedTopic(response, testTenant,
testNamespaceLocal, topicName, false, true,
+ true, 3);
+ verify(response,
timeout(5000).times(1)).resume(throwableCaptor.capture());
+ Assert.assertEquals(throwableCaptor.getValue().getMessage(),
+ "Number of new partitions must be greater than existing number
of partitions");
+
+ response = mock(AsyncResponse.class);
+ metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(response, testTenant,
testNamespaceLocal, topicName, true, false);
+ verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+ partitionedTopicMetadata = metaCaptor.getValue();
+ Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
}
}