This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7164561dfc5 [Improve][txn] Add admin api
updateTransactionCoordinatorNumber (#15296)
7164561dfc5 is described below
commit 7164561dfc50cefb3f13e7ab62f3408e6066c059
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Apr 29 16:01:00 2022 +0800
[Improve][txn] Add admin api updateTransactionCoordinatorNumber (#15296)
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 14 +++++++
.../pulsar/broker/admin/v3/Transactions.java | 25 ++++++++++++
.../pulsar/broker/admin/AdminApiTlsAuthTest.java | 20 +++++++++
.../broker/admin/v3/AdminApiTransactionTest.java | 47 ++++++++++++++++++++++
.../apache/pulsar/client/admin/Transactions.java | 15 +++++++
.../client/admin/internal/TransactionsImpl.java | 16 ++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++
.../apache/pulsar/admin/cli/CmdTransactions.java | 12 ++++++
8 files changed, 153 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index bbd2e0be9c3..f1631e0751a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
@@ -432,4 +433,17 @@ public abstract class TransactionsBase extends
AdminResource {
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Topic name is not valid");
}
}
+
+ protected CompletableFuture<Void> internalScaleTransactionCoordinators(int
replicas) {
+ return validateSuperUserAccessAsync()
+ .thenCompose((ignore) ->
namespaceResources().getPartitionedTopicResources()
+
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p
-> {
+ if (p.partitions >= replicas) {
+ throw new
RestException(Response.Status.NOT_ACCEPTABLE,
+ "Number of transaction coordinators
should "
+ + "be more than the current
number of transaction coordinator");
+ }
+ return new PartitionedTopicMetadata(replicas);
+ }));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 7b2c99ff9d5..48499134361 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -29,6 +29,7 @@ import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -36,6 +37,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -314,4 +316,27 @@ public class Transactions extends TransactionsBase {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
}
+
+ @POST
+ @Path("/transactionCoordinator/replicas")
+ @ApiResponses(value = {
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 406, message = "The number of replicas should
be more than "
+ + "the current number of transaction coordinator
replicas"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access")})
+ public void scaleTransactionCoordinators(@Suspended final AsyncResponse
asyncResponse, int replicas) {
+ try {
+ checkTransactionCoordinatorEnabled();
+ internalScaleTransactionCoordinators(replicas)
+ .thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(e -> {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return null;
+ });
+ } catch (Exception e) {
+ log.warn("{} Failed to update the scale of transaction
coordinators", clientAppId());
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
index 311e31be735..fcf68c907bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
@@ -48,6 +48,8 @@ import
org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.tls.NoopHostnameVerifier;
@@ -195,6 +197,24 @@ public class AdminApiTlsAuthTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testSuperUserCanUpdateScaleOfTransactionCoordinators() throws
Exception {
+ getPulsar().getConfiguration().setTransactionCoordinatorEnabled(true);
+ pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(3));
+ PulsarAdmin admin = buildAdminClient("admin");
+ admin.transactions().scaleTransactionCoordinators(4);
+ int partitions = pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
+ .get().get().partitions;
+ Assert.assertEquals(partitions, 4);
+ }
+
@Test
public void testProxyRoleCantDeleteResourceGroups() throws Exception {
try (PulsarAdmin admin = buildAdminClient("admin")) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index d46b640aac3..34492b3ca34 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin.v3;
import com.google.common.collect.Sets;
+import java.util.Set;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -60,6 +61,9 @@ import org.testng.annotations.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -519,6 +523,49 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
} catch (PulsarAdminException ex) {
assertEquals(ex.getStatusCode(),
HttpStatus.SC_SERVICE_UNAVAILABLE);
}
+ try {
+ admin.transactions().scaleTransactionCoordinators(1);
+ } catch (PulsarAdminException ex) {
+ assertEquals(ex.getStatusCode(),
HttpStatus.SC_SERVICE_UNAVAILABLE);
+ }
+ }
+
+ @Test
+ public void testUpdateTransactionCoordinatorNumber() throws Exception {
+ int coordinatorSize = 3;
+ pulsar.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(coordinatorSize));
+ try {
+ admin.transactions().scaleTransactionCoordinators(coordinatorSize
- 1);
+ fail();
+ } catch (PulsarAdminException pulsarAdminException) {
+ assertEquals(pulsarAdminException.getStatusCode(),
HttpStatus.SC_NOT_ACCEPTABLE);
+ }
+ try {
+ admin.transactions().scaleTransactionCoordinators(-1);
+ fail();
+ } catch (PulsarAdminException pulsarAdminException) {
+ assertEquals(pulsarAdminException.getCause().getMessage(),
+ "Number of transaction coordinators must be more than 0");
+ }
+
+ admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2);
+ pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+ pulsarClient.close();
+ Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size() ==
+ coordinatorSize * 2);
+ pulsar.getConfiguration().setAuthenticationEnabled(true);
+ Set<String> proxyRoles = spy(Set.class);
+ doReturn(true).when(proxyRoles).contains(any());
+ pulsar.getConfiguration().setProxyRoles(proxyRoles);
+ try {
+ admin.transactions().scaleTransactionCoordinators(coordinatorSize
* 2 + 1);
+ fail();
+ } catch (PulsarAdminException.NotAuthorizedException ignored) {
+ }
}
private static void verifyCoordinatorStats(String state,
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index b082f1c1780..803f50f8bdd 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -243,4 +243,19 @@ public interface Transactions {
TransactionPendingAckInternalStats getPendingAckInternalStats(String
topic, String subName,
boolean
metadata) throws PulsarAdminException;
+ /**
+ * Sets the scale of the transaction coordinators.
+ * And currently, we can only support scale-up.
+ * @param replicas the new transaction coordinators size.
+ */
+ void scaleTransactionCoordinators(int replicas) throws
PulsarAdminException;
+
+ /**
+ * Asynchronously sets the size of the transaction coordinators.
+ * And currently, we can only support scale-up.
+ * @param replicas the new transaction coordinators size.
+ * @return a future that can be used to track when the transaction
coordinator number is updated.
+ */
+ CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
+
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 46262d20531..d31ae9cfeaf 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.client.admin.internal;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.Authentication;
@@ -335,4 +338,17 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
return sync(() -> getPendingAckInternalStatsAsync(topic, subName,
metadata));
}
+ @Override
+ public void scaleTransactionCoordinators(int replicas) throws
PulsarAdminException {
+ sync(() -> scaleTransactionCoordinatorsAsync(replicas));
+ }
+
+ @Override
+ public CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int
replicas) {
+ checkArgument(replicas > 0, "Number of transaction coordinators must
be more than 0");
+ WebTarget path = adminV3Transactions.path("transactionCoordinator");
+ path = path.path("replicas");
+ return asyncPostRequest(path, Entity.entity(replicas,
MediaType.APPLICATION_JSON));
+ }
+
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index c23d6f47cd5..299ee713c68 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2056,6 +2056,10 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("pending-ack-internal-stats -t test -s
test"));
verify(transactions).getPendingAckInternalStats("test", "test", false);
+
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
+ verify(transactions).scaleTransactionCoordinators(3);
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index e6953817b0d..f6d11e5b4a8 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -178,6 +178,17 @@ public class CmdTransactions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Update the scale of transaction
coordinators")
+ private class ScaleTransactionCoordinators extends CliCommand {
+ @Parameter(names = { "-r", "--replicas" }, description = "The scale of
the transaction coordinators")
+ private int replicas;
+ @Override
+ void run() throws Exception {
+ getAdmin().transactions().scaleTransactionCoordinators(replicas);
+ }
+ }
+
+
public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
jcommander.addCommand("coordinator-internal-stats", new
GetCoordinatorInternalStats());
@@ -189,5 +200,6 @@ public class CmdTransactions extends CmdBase {
jcommander.addCommand("transaction-in-pending-ack-stats", new
GetTransactionInPendingAckStats());
jcommander.addCommand("transaction-metadata", new
GetTransactionMetadata());
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
+ jcommander.addCommand("scale-transactionCoordinators", new
ScaleTransactionCoordinators());
}
}