codelipenghui commented on code in PR #15296:
URL: https://github.com/apache/pulsar/pull/15296#discussion_r859593616
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -432,4 +433,27 @@ protected void validateTopicName(String property, String
namespace, String encod
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Topic name is not valid");
}
}
+
+ protected void internalScaleTransactionCoordinators(AsyncResponse
asyncResponse, int replicas) {
+ validateSuperUserAccessAsync().thenAccept((ignore) -> {
+ namespaceResources().getPartitionedTopicResources()
+
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p
-> {
+ if (p.partitions >= replicas) {
+ throw new
RestException(Response.Status.NOT_ACCEPTABLE,
+ "Number of transaction
coordinators should "
+ + "be more than the current number
of transaction coordinator");
+ }
+ return new PartitionedTopicMetadata(replicas);
+ }).thenAccept(r ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("{} Failed to update the scale of
transaction coordinators", clientAppId());
+
resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }).exceptionally(e -> {
+ log.error("{} Failed to update the scale of transaction
coordinators", clientAppId());
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
Review Comment:
Use `thenCompose().thenAccept().exceptionally()` to simplify the
implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]