This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eeffd8c4757 KAFKA-19003: Add forceTerminateTransaction command to CLI
tools (#19276)
eeffd8c4757 is described below
commit eeffd8c4757434080c806fe143fddc07b58f4c44
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Apr 2 11:51:26 2025 -0700
KAFKA-19003: Add forceTerminateTransaction command to CLI tools (#19276)
This patch is part of KIP-939 [Support Participation in
2PC](https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC)
The kafka-transactions.sh tool will support a new command
--forceTerminateTransaction It has one required argument
--transactionalId that would take the transactional id for the
transaction to be terminated.
The command uses the existing Admin#fenceProducers method to forcefully
abort the transaction associated with the specified transactional ID.
Under the hood, it sends an InitProducerId request to the transaction
coordinator with the given transactional ID and keepPreparedTxn = false
by default. This is aligned with the functionality outlined in the KIP.
We will be creating a new public method in the Admin Client **public
TerminateTransactionResult forceTerminateTransaction(String
transactionalId)**, and re-use the existing fence producer method.
Reviewers: Artem Livshits <[email protected]>, Justine Olshan
<[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 26 +++++++
.../kafka/clients/admin/ForwardingAdmin.java | 5 ++
.../kafka/clients/admin/KafkaAdminClient.java | 29 ++++++++
.../clients/admin/TerminateTransactionOptions.java | 31 ++++++++
.../clients/admin/TerminateTransactionResult.java | 39 ++++++++++
.../kafka/clients/admin/KafkaAdminClientTest.java | 87 ++++++++++++++++++++++
.../kafka/clients/admin/MockAdminClient.java | 5 ++
.../TestingMetricsInterceptingAdminClient.java | 7 ++
.../apache/kafka/tools/TransactionsCommand.java | 39 +++++++++-
.../kafka/tools/TransactionsCommandTest.java | 31 +++++++-
10 files changed, 297 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index aa30650c6a5..3bd122c46d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -2049,4 +2049,30 @@ public interface Admin extends AutoCloseable {
* Get the metrics kept by the adminClient
*/
Map<MetricName, ? extends Metric> metrics();
+
+ /**
+ * Force terminate a transaction for the given transactional ID with the
default options.
+ * <p>
+ * This is a convenience method for {@link
#forceTerminateTransaction(String, TerminateTransactionOptions)}
+ * with default options.
+ *
+ * @param transactionalId The ID of the transaction to terminate.
+ * @return The TerminateTransactionResult.
+ */
+ default TerminateTransactionResult forceTerminateTransaction(String
transactionalId) {
+ return forceTerminateTransaction(transactionalId, new
TerminateTransactionOptions());
+ }
+
+ /**
+ * Force terminate a transaction for the given transactional ID.
+ * This operation aborts any ongoing transaction associated with the
transactional ID.
+ * It's similar to fenceProducers but only targets a single transactional
ID to handle
+ * long-running transactions when 2PC is enabled.
+ *
+ * @param transactionalId The ID of the transaction to terminate.
+ * @param options The options to use when terminating the
transaction.
+ * @return The TerminateTransactionResult.
+ */
+ TerminateTransactionResult forceTerminateTransaction(String
transactionalId,
+
TerminateTransactionOptions options);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index f4c7b4c4876..8b7db2f04f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -283,6 +283,11 @@ public class ForwardingAdmin implements Admin {
return delegate.abortTransaction(spec, options);
}
+ @Override
+ public TerminateTransactionResult forceTerminateTransaction(String
transactionalId, TerminateTransactionOptions options) {
+ return delegate.forceTerminateTransaction(transactionalId, options);
+ }
+
@Override
public ListTransactionsResult listTransactions(ListTransactionsOptions
options) {
return delegate.listTransactions(options);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c4dcceae63c..e85dcf21e03 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4839,6 +4839,35 @@ public class KafkaAdminClient extends AdminClient {
return new AbortTransactionResult(future.all());
}
+ /**
+ * Forcefully terminates an ongoing transaction for a given transactional
ID.
+ * <p>
+ * This API is intended for well-formed but long-running transactions that
are known to the
+ * transaction coordinator. It is primarily designed for supporting 2PC
(two-phase commit) workflows,
+ * where a coordinator may need to unilaterally terminate a participant
transaction that hasn't completed.
+ * </p>
+ *
+ * @param transactionalId The transactional ID whose active
transaction should be forcefully terminated.
+ * @return a {@link TerminateTransactionResult} that can be used to await
the operation result.
+ */
+ @Override
+ public TerminateTransactionResult forceTerminateTransaction(String
transactionalId, TerminateTransactionOptions options) {
+ // Simply leverage the existing fenceProducers implementation with a
single transactional ID
+ FenceProducersOptions fenceOptions = new FenceProducersOptions();
+ if (options.timeoutMs() != null) {
+ fenceOptions.timeoutMs(options.timeoutMs());
+ }
+
+ FenceProducersResult fenceResult = fenceProducers(
+ Collections.singleton(transactionalId),
+ fenceOptions
+ );
+
+ // Convert the result to a TerminateTransactionResult
+ KafkaFuture<Void> future =
fenceResult.fencedProducers().get(transactionalId);
+ return new TerminateTransactionResult(future);
+ }
+
@Override
public ListTransactionsResult listTransactions(ListTransactionsOptions
options) {
AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>>
future =
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionOptions.java
new file mode 100644
index 00000000000..0b8caaee935
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for {@link Admin#forceTerminateTransaction(String,
TerminateTransactionOptions)}.
+ */
+public class TerminateTransactionOptions extends
AbstractOptions<TerminateTransactionOptions> {
+
+ @Override
+ public String toString() {
+ return "TerminateTransactionOptions{" +
+ "timeoutMs=" + timeoutMs +
+ '}';
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionResult.java
new file mode 100644
index 00000000000..18fee2477be
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionResult.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#forceTerminateTransaction(String)} call.
+ */
+public class TerminateTransactionResult {
+
+ private final KafkaFuture<Void> future;
+
+ TerminateTransactionResult(KafkaFuture<Void> future) {
+ this.future = future;
+ }
+
+ /**
+ * Return a future which indicates whether the transaction was
successfully terminated.
+ */
+ public KafkaFuture<Void> result() {
+ return future;
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 5e3a884742d..ac309e1c4d9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
@@ -9746,6 +9747,92 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testForceTerminateTransaction() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ String transactionalId = "testForceTerminate";
+ Node transactionCoordinator =
env.cluster().nodes().iterator().next();
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
+ Errors.NONE,
+ transactionalId,
+ transactionCoordinator
+ ));
+
+ // Complete the init PID request successfully
+ InitProducerIdResponseData initProducerIdResponseData = new
InitProducerIdResponseData()
+ .setProducerId(5678)
+ .setProducerEpoch((short) 123);
+
+ env.kafkaClient().prepareResponseFrom(request ->
+ request instanceof InitProducerIdRequest,
+ new InitProducerIdResponse(initProducerIdResponseData),
+ transactionCoordinator
+ );
+
+ // Call force terminate and verify results
+ TerminateTransactionResult result =
env.adminClient().forceTerminateTransaction(transactionalId);
+ assertNull(result.result().get());
+ }
+ }
+
+ @Test
+ public void testForceTerminateTransactionWithError() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ String transactionalId = "testForceTerminateError";
+ Node transactionCoordinator =
env.cluster().nodes().iterator().next();
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
+ Errors.NONE,
+ transactionalId,
+ transactionCoordinator
+ ));
+
+ // Return an error from the InitProducerId request
+ env.kafkaClient().prepareResponseFrom(request ->
+ request instanceof InitProducerIdRequest,
+ new InitProducerIdResponse(new InitProducerIdResponseData()
+
.setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code())),
+ transactionCoordinator
+ );
+
+ // Call force terminate and verify error is propagated
+ TerminateTransactionResult result =
env.adminClient().forceTerminateTransaction(transactionalId);
+ ExecutionException exception =
assertThrows(ExecutionException.class, () -> result.result().get());
+ assertTrue(exception.getCause() instanceof
TransactionalIdAuthorizationException);
+ }
+ }
+
+ @Test
+ public void testForceTerminateTransactionWithCustomTimeout() throws
Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ String transactionalId = "testForceTerminateTimeout";
+ Node transactionCoordinator =
env.cluster().nodes().iterator().next();
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
+ Errors.NONE,
+ transactionalId,
+ transactionCoordinator
+ ));
+
+ // Complete the init PID request
+ InitProducerIdResponseData initProducerIdResponseData = new
InitProducerIdResponseData()
+ .setProducerId(9012)
+ .setProducerEpoch((short) 456);
+
+ env.kafkaClient().prepareResponseFrom(request ->
+ request instanceof InitProducerIdRequest,
+ new InitProducerIdResponse(initProducerIdResponseData),
+ transactionCoordinator
+ );
+
+ // Use custom timeout
+ TerminateTransactionOptions options = new
TerminateTransactionOptions().timeoutMs(10000);
+ TerminateTransactionResult result =
env.adminClient().forceTerminateTransaction(transactionalId, options);
+ assertNull(result.result().get());
+ }
+ }
+
@Test
public void testListTransactions() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 9bd566ac11c..f58548465d5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1382,6 +1382,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public TerminateTransactionResult forceTerminateTransaction(String
transactionalId, TerminateTransactionOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public ListTransactionsResult listTransactions(ListTransactionsOptions
options) {
throw new UnsupportedOperationException("Not implemented yet");
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 60a042aa28d..2005da2d2ce 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -141,6 +141,8 @@ import
org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.clients.admin.RemoveRaftVoterResult;
import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.clients.admin.TerminateTransactionOptions;
+import org.apache.kafka.clients.admin.TerminateTransactionResult;
import org.apache.kafka.clients.admin.UnregisterBrokerOptions;
import org.apache.kafka.clients.admin.UnregisterBrokerResult;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
@@ -416,6 +418,11 @@ public class TestingMetricsInterceptingAdminClient extends
AdminClient {
return adminDelegate.abortTransaction(spec, options);
}
+ @Override
+ public TerminateTransactionResult forceTerminateTransaction(final String
transactionalId, final TerminateTransactionOptions options) {
+ return adminDelegate.forceTerminateTransaction(transactionalId,
options);
+ }
+
@Override
public ListTransactionsResult listTransactions(final
ListTransactionsOptions options) {
return adminDelegate.listTransactions(options);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index af9df7769f1..508347c819e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -254,6 +254,42 @@ public abstract class TransactionsCommand {
}
}
+ static class ForceTerminateTransactionsCommand extends TransactionsCommand
{
+
+ ForceTerminateTransactionsCommand(Time time) {
+ super(time);
+ }
+
+ @Override
+ String name() {
+ return "forceTerminateTransaction";
+ }
+
+ @Override
+ void addSubparser(Subparsers subparsers) {
+ Subparser subparser = subparsers.addParser(name())
+ .description("Force abort an ongoing transaction on
transactionalId")
+ .help("Force abort an ongoing transaction on transactionalId
(requires administrative privileges)");
+
+ subparser.addArgument("--transactionalId")
+ .help("transactional id")
+ .action(store())
+ .type(String.class)
+ .required(true);
+ }
+
+ @Override
+ void execute(Admin admin, Namespace ns, PrintStream out) throws
Exception {
+ String transactionalId = ns.getString("transactionalId");
+
+ try {
+
admin.forceTerminateTransaction(transactionalId).result().get();
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to force terminate transactionalId
`" + transactionalId + "`", e.getCause());
+ }
+ }
+ }
+
static class DescribeProducersCommand extends TransactionsCommand {
static final List<String> HEADERS = asList(
"ProducerId",
@@ -990,7 +1026,8 @@ public abstract class TransactionsCommand {
new DescribeTransactionsCommand(time),
new DescribeProducersCommand(time),
new AbortTransactionCommand(time),
- new FindHangingTransactionsCommand(time)
+ new FindHangingTransactionsCommand(time),
+ new ForceTerminateTransactionsCommand(time)
);
ArgumentParser parser = buildBaseParser();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index efb25d027f0..8785d4f1159 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TerminateTransactionResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
@@ -239,6 +240,35 @@ public class TransactionsCommandTest {
assertEquals(expectedRows, new HashSet<>(table.subList(1,
table.size())));
}
+ @Test
+ public void testForceTerminateTransaction() throws Exception {
+ String transactionalId = "foo";
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "forceTerminateTransaction",
+ "--transactionalId",
+ transactionalId
+ };
+
+ TerminateTransactionResult terminateTransactionResult =
Mockito.mock(TerminateTransactionResult.class);
+ KafkaFuture<Void> future = KafkaFuture.completedFuture(null);
+ Mockito.when(terminateTransactionResult.result()).thenReturn(future);
+
Mockito.when(admin.forceTerminateTransaction(transactionalId)).thenReturn(terminateTransactionResult);
+
+ execute(args);
+ assertNormalExit();
+ }
+
+ @Test
+ public void testForceTerminateTransactionTransactionalIdRequired() throws
Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "force-terminate"
+ });
+ }
+
@Test
public void testDescribeTransactionsTransactionalIdRequired() throws
Exception {
assertCommandFailure(new String[]{
@@ -1066,5 +1096,4 @@ public class TransactionsCommandTest {
assertTrue(exitProcedure.hasExited());
assertEquals(1, exitProcedure.statusCode());
}
-
}