congbobo184 commented on code in PR #15296:
URL: https://github.com/apache/pulsar/pull/15296#discussion_r859293737


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,32 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected void internalScaleTransactionCoordinators(AsyncResponse 
asyncResponse, int replicas) {
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+        if (maxPartitions > 0 && replicas > maxPartitions) {
+            throw new RestException(Response.Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be less than or equal to " + 
maxPartitions);
+        }
+        validateSuperUserAccessAsync().thenAccept((ignore) -> {
+                    namespaceResources().getPartitionedTopicResources()
+                            
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p 
-> {
+                                if (p.partitions >= replicas) {
+                                    throw new 
RestException(Response.Status.NOT_ACCEPTABLE,
+                                            "Number of partitions should "

Review Comment:
   ```suggestion
                                               "Number of partitions should "
   ```
   Number of transaction coordinators



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,32 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected void internalScaleTransactionCoordinators(AsyncResponse 
asyncResponse, int replicas) {
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+        if (maxPartitions > 0 && replicas > maxPartitions) {
+            throw new RestException(Response.Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be less than or equal to " + 
maxPartitions);

Review Comment:
   may we don't need this config to control



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,32 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected void internalScaleTransactionCoordinators(AsyncResponse 
asyncResponse, int replicas) {
+        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+        if (maxPartitions > 0 && replicas > maxPartitions) {
+            throw new RestException(Response.Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be less than or equal to " + 
maxPartitions);
+        }
+        validateSuperUserAccessAsync().thenAccept((ignore) -> {
+                    namespaceResources().getPartitionedTopicResources()
+                            
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p 
-> {
+                                if (p.partitions >= replicas) {
+                                    throw new 
RestException(Response.Status.NOT_ACCEPTABLE,
+                                            "Number of partitions should "
+                                            + "be more than the current number 
of transaction coordinator partitions");

Review Comment:
   ```suggestion
                                               + "be more than the current 
number of transaction coordinator partitions");
   ```
   be more than the currenst number of transaction coordinator



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java:
##########
@@ -335,4 +338,17 @@ public TransactionPendingAckInternalStats 
getPendingAckInternalStats(String topi
         return sync(() -> getPendingAckInternalStatsAsync(topic, subName, 
metadata));
     }
 
+    @Override
+    public void scaleTransactionCoordinators(int replicas) throws 
PulsarAdminException {
+         sync(() -> scaleTransactionCoordinatorsAsync(replicas));
+    }
+
+    @Override
+    public CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int 
replicas) {
+        checkArgument(replicas > 0, "Number of partitions must be more than 
0");

Review Comment:
   transaction coordinators



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to