This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 22838eae2f3 [improve][txn] Add command to abort transaction (#21630)
22838eae2f3 is described below
commit 22838eae2f3b365abe09e305d15dbc198b58a231
Author: hrzzzz <[email protected]>
AuthorDate: Thu Jan 18 14:43:56 2024 +0800
[improve][txn] Add command to abort transaction (#21630)
### Motivation
<!-- Explain here the context, and why you're making that change. What is
the problem you're trying to solve. -->
In the implementation of Pulsar transaction, if we have a stuck
transaction, then the transactions after this one cannot be consumed by the
consumer even if they have been committed. The consumer will be stuck until the
stuck transaction is aborted due to timeout, and then it will continue to
consume messages. Therefore, we need to provide an admin command to proactively
abort the transaction in such situations.
### Modifications
<!-- Describe the modifications you've done. -->
Introduce a new API for aborting transaction.
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 10 ++++++++
.../pulsar/broker/admin/v3/Transactions.java | 29 ++++++++++++++++++++++
.../broker/admin/v3/AdminApiTransactionTest.java | 23 +++++++++++++++++
.../apache/pulsar/client/admin/Transactions.java | 14 +++++++++++
.../client/admin/internal/TransactionsImpl.java | 13 ++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +++
.../apache/pulsar/admin/cli/CmdTransactions.java | 14 +++++++++++
7 files changed, 107 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 470cdc3e74b..1014c9fe8e3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -560,4 +561,13 @@ public abstract class TransactionsBase extends
AdminResource {
});
return completableFuture;
}
+
+ protected CompletableFuture<Void> internalAbortTransaction(boolean
authoritative, long mostSigBits,
+ long
leastSigBits) {
+ return validateTopicOwnershipAsync(
+
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)
mostSigBits), authoritative)
+ .thenCompose(__ -> validateSuperUserAccessAsync())
+ .thenCompose(__ ->
pulsar().getTransactionMetadataStoreService()
+ .endTransaction(new TxnID(mostSigBits, leastSigBits),
TxnAction.ABORT_VALUE, false));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 1bdc2255085..83ee03b2e4f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -439,4 +439,33 @@ public class Transactions extends TransactionsBase {
}
}
+ @POST
+ @Path("/abortTransaction/{mostSigBits}/{leastSigBits}")
+ @ApiOperation(value = "Abort transaction")
+ @ApiResponses(value = {
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic "
+ + "or coordinator or transaction doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 307, message = "Topic is not owned by this
broker!"),
+ @ApiResponse(code = 400, message = "Topic is not a persistent
topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access")})
+ public void abortTransaction(@Suspended final AsyncResponse asyncResponse,
+ @QueryParam("authoritative")
+ @DefaultValue("false") boolean authoritative,
+ @PathParam("mostSigBits") String mostSigBits,
+ @PathParam("leastSigBits") String
leastSigBits) {
+ try {
+ checkTransactionCoordinatorEnabled();
+ internalAbortTransaction(authoritative,
Long.parseLong(mostSigBits), Long.parseLong(leastSigBits))
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 049fd0f5f44..adf810945de 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -75,7 +75,10 @@ import
org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -894,6 +897,26 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
}
+ @Test
+ public void testAbortTransaction() throws Exception {
+ initTransaction(1);
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES).build().get();
+
+ TxnMeta txnMeta =
pulsar.getTransactionMetadataStoreService().getTxnMeta(transaction.getTxnID()).get();
+ assertEquals(txnMeta.status(), TxnStatus.OPEN);
+
+ // abort
+ admin.transactions().abortTransaction(transaction.getTxnID());
+ try {
+
pulsar.getTransactionMetadataStoreService().getTxnMeta(transaction.getTxnID()).get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
CoordinatorException.TransactionNotFoundException);
+ }
+ }
+
private static void verifyCoordinatorStats(String state,
long sequenceId, long
lowWaterMark) {
assertEquals(state, "Ready");
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index b0504bee744..8fadabdfba2 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -400,4 +400,18 @@ public interface Transactions {
CompletableFuture<PositionInPendingAckStats>
getPositionStatsInPendingAckAsync(String topic, String subName,
Long ledgerId, Long entryId,
Integer batchIndex);
+
+ /**
+ * Abort a transaction.
+ *
+ * @param txnID the txnId
+ */
+ void abortTransaction(TxnID txnID) throws PulsarAdminException;
+
+ /**
+ * Asynchronously abort a transaction.
+ *
+ * @param txnID the txnId
+ */
+ CompletableFuture<Void> abortTransactionAsync(TxnID txnID);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 2d1dd408ef6..460478787eb 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -283,4 +283,17 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
throws PulsarAdminException {
return sync(() -> getPositionStatsInPendingAckAsync(topic, subName,
ledgerId, entryId, batchIndex));
}
+
+ @Override
+ public CompletableFuture<Void> abortTransactionAsync(TxnID txnID) {
+ WebTarget path = adminV3Transactions.path("abortTransaction");
+ path = path.path(String.valueOf(txnID.getMostSigBits()));
+ path = path.path(String.valueOf(txnID.getLeastSigBits()));
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void abortTransaction(TxnID txnID) throws PulsarAdminException {
+ sync(() -> abortTransactionAsync(txnID));
+ }
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index fdcf6bdac35..2d6396d5928 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2415,6 +2415,10 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("coordinators-list"));
verify(transactions).listTransactionCoordinators();
+
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("abort-transaction -m 1 -l 2"));
+ verify(transactions).abortTransaction(new TxnID(1, 2));
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index 279759021d8..41aea611ea8 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -250,6 +250,19 @@ public class CmdTransactions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Abort transaction")
+ private class AbortTransaction extends CliCommand {
+ @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most
sig bits", required = true)
+ private long mostSigBits;
+
+ @Parameter(names = {"-l", "--least-sig-bits"}, description = "The
least sig bits", required = true)
+ private long leastSigBits;
+
+ @Override
+ void run() throws Exception {
+ getAdmin().transactions().abortTransaction(new TxnID(mostSigBits,
leastSigBits));
+ }
+ }
public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
@@ -266,6 +279,7 @@ public class CmdTransactions extends CmdBase {
jcommander.addCommand("scale-transactionCoordinators", new
ScaleTransactionCoordinators());
jcommander.addCommand("position-stats-in-pending-ack", new
GetPositionStatsInPendingAck());
jcommander.addCommand("coordinators-list", new
ListTransactionCoordinators());
+ jcommander.addCommand("abort-transaction", new AbortTransaction());
}
}