codelipenghui commented on code in PR #15296:
URL: https://github.com/apache/pulsar/pull/15296#discussion_r860337638


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -314,4 +316,25 @@ public void getPendingAckInternalStats(@Suspended final 
AsyncResponse asyncRespo
             resumeAsyncResponseExceptionally(asyncResponse, ex);
         }
     }
+
+    @POST
+    @Path("/transactionCoordinator/replicas")
+    @ApiResponses(value = {
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),

Review Comment:
   Have you added this check in the implementation?
   And should also add 
   
   ```
               @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,24 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected CompletableFuture<Void> internalScaleTransactionCoordinators(int 
replicas) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();

Review Comment:
   We don't need to create the CompletableFuture here but complete it in 
another thread.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java:
##########
@@ -189,5 +200,6 @@ public CmdTransactions(Supplier<PulsarAdmin> admin) {
         jcommander.addCommand("transaction-in-pending-ack-stats", new 
GetTransactionInPendingAckStats());
         jcommander.addCommand("transaction-metadata", new 
GetTransactionMetadata());
         jcommander.addCommand("slow-transactions", new GetSlowTransactions());
+        jcommander.addCommand("scale-transactionCoordinators", new 
ScaleTransactionCoordinators());

Review Comment:
   Need a test to cover here to make sure the CLI works.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,24 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected CompletableFuture<Void> internalScaleTransactionCoordinators(int 
replicas) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        validateSuperUserAccessAsync()
+                .thenCompose((ignore)-> 
namespaceResources().getPartitionedTopicResources()
+                        
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p 
-> {
+                            if (p.partitions >= replicas) {
+                                throw new 
RestException(Response.Status.NOT_ACCEPTABLE,
+                                        "Number of transaction coordinators 
should "
+                                                + "be more than the current 
number of transaction coordinator");
+                            }
+                            return new PartitionedTopicMetadata(replicas);
+                        }).thenAccept(r -> completableFuture.complete(null)))
+                .exceptionally(e -> {
+                    log.error("{} Failed to update the scale of transaction 
coordinators", clientAppId());
+                    completableFuture.completeExceptionally(e);
+                    return null;

Review Comment:
   Method `scaleTransactionCoordinators` already handled the exceptions, we 
don't need to handle them again here. Just return the `CompletableFuture`.
   
   And for the log here, we don't need an error log, warn level log is enough.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,24 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected CompletableFuture<Void> internalScaleTransactionCoordinators(int 
replicas) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        validateSuperUserAccessAsync()
+                .thenCompose((ignore)-> 
namespaceResources().getPartitionedTopicResources()
+                        
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p 
-> {
+                            if (p.partitions >= replicas) {
+                                throw new 
RestException(Response.Status.NOT_ACCEPTABLE,
+                                        "Number of transaction coordinators 
should "
+                                                + "be more than the current 
number of transaction coordinator");
+                            }
+                            return new PartitionedTopicMetadata(replicas);
+                        }).thenAccept(r -> completableFuture.complete(null)))

Review Comment:
   We can remove the `.thenAccept(r -> completableFuture.complete(null)))`, 
just return a CompletableFuture in `updatePartitionedTopicAsync`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -519,6 +519,38 @@ public void testTransactionNotEnabled() throws Exception {
         } catch (PulsarAdminException ex) {
             assertEquals(ex.getStatusCode(), 
HttpStatus.SC_SERVICE_UNAVAILABLE);
         }
+        try {
+            admin.transactions().scaleTransactionCoordinators(1);
+        } catch (PulsarAdminException ex) {
+            assertEquals(ex.getStatusCode(), 
HttpStatus.SC_SERVICE_UNAVAILABLE);
+        }
+    }
+
+    @Test
+    public void testUpdateTransactionCoordinatorNumber() throws Exception {
+        int coordinatorSize = 3;
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(coordinatorSize));
+        try {
+            admin.transactions().scaleTransactionCoordinators(coordinatorSize 
- 1);

Review Comment:
   Could you please also add a test for auth enabled? only the super user can 
access this API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to