codelipenghui commented on code in PR #15296:
URL: https://github.com/apache/pulsar/pull/15296#discussion_r859593616


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,27 @@ protected void validateTopicName(String property, String 
namespace, String encod
             throw new RestException(Response.Status.PRECONDITION_FAILED, 
"Topic name is not valid");
         }
     }
+
+    protected void internalScaleTransactionCoordinators(AsyncResponse 
asyncResponse, int replicas) {
+        validateSuperUserAccessAsync().thenAccept((ignore) -> {
+                    namespaceResources().getPartitionedTopicResources()
+                            
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p 
-> {
+                                if (p.partitions >= replicas) {
+                                    throw new 
RestException(Response.Status.NOT_ACCEPTABLE,
+                                            "Number of transaction 
coordinators should "
+                                            + "be more than the current number 
of transaction coordinator");
+                                }
+                                return new PartitionedTopicMetadata(replicas);
+                            }).thenAccept(r -> 
asyncResponse.resume(Response.noContent().build()))
+                            .exceptionally(ex -> {
+                                log.error("{} Failed to update the scale of 
transaction coordinators", clientAppId());
+                                
resumeAsyncResponseExceptionally(asyncResponse, ex);
+                                return null;
+                            });
+                }).exceptionally(e -> {
+                    log.error("{} Failed to update the scale of transaction 
coordinators", clientAppId());
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });

Review Comment:
   Use `thenCompose().thenAccept().exceptionally()` to simplify the 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to