This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 16a39e6abf2 [improve][broker]Only create extended partitions when 
updating partition number (#17349)
16a39e6abf2 is described below

commit 16a39e6abf2c8f61a2e50d3201183122afa40801
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Sep 13 10:25:34 2022 +0800

    [improve][broker]Only create extended partitions when updating partition 
number (#17349)
---
 .../org/apache/pulsar/broker/admin/AdminResource.java    | 15 +++++++++++++++
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java   | 11 ++++++-----
 .../apache/pulsar/broker/admin/v1/PersistentTopics.java  |  3 ++-
 .../apache/pulsar/broker/admin/v2/PersistentTopics.java  |  3 ++-
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 16 ++++++++++++++++
 5 files changed, 41 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3a779845168..1b05df826c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -162,6 +162,21 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return FutureUtil.waitForAll(futures);
     }
 
+    protected CompletableFuture<Void> tryCreateExtendedPartitionsAsync(int 
oldNumPartitions, int numPartitions) {
+        if (!topicName.isPersistent()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (numPartitions <= oldNumPartitions) {
+            return CompletableFuture.failedFuture(new 
RestException(Status.NOT_ACCEPTABLE,
+                    "Number of new partitions must be greater than existing 
number of partitions"));
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions 
- oldNumPartitions);
+        for (int i = oldNumPartitions; i < numPartitions; i++) {
+            futures.add(tryCreatePartitionAsync(i));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+
     private CompletableFuture<Void> tryCreatePartitionAsync(final int 
partition) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         
getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition))
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 274a2adf3a4..b477a3dc1d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -464,8 +464,8 @@ public class PersistentTopicsBase extends AdminResource {
                 }  else {
                     return CompletableFuture.completedFuture(null);
                 }
-            })
-            .thenCompose(__ -> {
+            }).thenCompose(__ -> 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
+            .thenCompose(topicMetadata -> {
                 final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
                 if (maxPartitions > 0 && numPartitions > maxPartitions) {
                     throw new RestException(Status.NOT_ACCEPTABLE,
@@ -483,8 +483,9 @@ public class PersistentTopicsBase extends AdminResource {
                                 }
                                 return clusters;
                             })
-                            .thenCompose(clusters -> 
tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
-                                    clusters))
+                            .thenCompose(clusters ->
+                                    
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
+                                            .thenApply(ignore -> clusters))
                             .thenCompose(clusters -> 
createSubscriptions(topicName, numPartitions, force).thenApply(
                                     ignore -> clusters))
                             .thenCompose(clusters -> {
@@ -500,7 +501,7 @@ public class PersistentTopicsBase extends AdminResource {
                                 }
                             });
                 } else {
-                    return tryCreatePartitionsAsync(numPartitions)
+                    return 
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
                             .thenCompose(ignore -> 
updatePartitionedTopic(topicName, numPartitions, force));
                 }
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index df32386152a..8e3f3adbfee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -273,7 +273,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0"
-                    + " and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
+                    + " and less than or equal to 
maxNumPartitionsPerPartitionedTopic"
+                    + " and number of new partitions must be greater than 
existing number of partitions"),
             @ApiResponse(code = 409, message = "Partitioned topic does not 
exist")})
     public void updatePartitionedTopic(
             @Suspended final AsyncResponse asyncResponse,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f283bb7aa5c..742f27161f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -832,7 +832,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and"
-                    + " less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
+                    + " less than or equal to 
maxNumPartitionsPerPartitionedTopic"
+                    + " and number of new partitions must be greater than 
existing number of partitions"),
             @ApiResponse(code = 409, message = "Partitioned topic does not 
exist"),
             @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
             @ApiResponse(code = 500, message = "Internal server error")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 29d81d609b4..e4097b68e88 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1600,5 +1600,21 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
         partitionedTopicMetadata = metaCaptor.getValue();
         Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+        // check number of new partitions must be greater than existing number 
of partitions
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Throwable> throwableCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, 
testNamespaceLocal, topicName, false, true,
+                true, 3);
+        verify(response, 
timeout(5000).times(1)).resume(throwableCaptor.capture());
+        Assert.assertEquals(throwableCaptor.getValue().getMessage(),
+                "Number of new partitions must be greater than existing number 
of partitions");
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, 
testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
     }
 }

Reply via email to