This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eeffd8c4757 KAFKA-19003: Add forceTerminateTransaction command to CLI 
tools (#19276)
eeffd8c4757 is described below

commit eeffd8c4757434080c806fe143fddc07b58f4c44
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Apr 2 11:51:26 2025 -0700

    KAFKA-19003: Add forceTerminateTransaction command to CLI tools (#19276)
    
    This patch is part of KIP-939 [Support Participation in
    
2PC](https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC)
    
    The kafka-transactions.sh tool will support a new command
    --forceTerminateTransaction It has one required argument
    --transactionalId that would take the transactional id for the
    transaction to be terminated.
    
    The command uses the existing Admin#fenceProducers method to forcefully
    abort the transaction associated with the specified transactional ID.
    Under the hood, it sends an InitProducerId request to the transaction
    coordinator with the given transactional ID and keepPreparedTxn = false
    by default. This is aligned with the functionality outlined in the KIP.
    
    We will be creating a new public method in the Admin Client **public
    TerminateTransactionResult forceTerminateTransaction(String
    transactionalId)**, and re-use the existing fence producer method.
    
    Reviewers: Artem Livshits <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java | 26 +++++++
 .../kafka/clients/admin/ForwardingAdmin.java       |  5 ++
 .../kafka/clients/admin/KafkaAdminClient.java      | 29 ++++++++
 .../clients/admin/TerminateTransactionOptions.java | 31 ++++++++
 .../clients/admin/TerminateTransactionResult.java  | 39 ++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 87 ++++++++++++++++++++++
 .../kafka/clients/admin/MockAdminClient.java       |  5 ++
 .../TestingMetricsInterceptingAdminClient.java     |  7 ++
 .../apache/kafka/tools/TransactionsCommand.java    | 39 +++++++++-
 .../kafka/tools/TransactionsCommandTest.java       | 31 +++++++-
 10 files changed, 297 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index aa30650c6a5..3bd122c46d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -2049,4 +2049,30 @@ public interface Admin extends AutoCloseable {
      * Get the metrics kept by the adminClient
      */
     Map<MetricName, ? extends Metric> metrics();
+
+    /**
+     * Force terminate a transaction for the given transactional ID with the 
default options.
+     * <p>
+     * This is a convenience method for {@link 
#forceTerminateTransaction(String, TerminateTransactionOptions)}
+     * with default options.
+     *
+     * @param transactionalId           The ID of the transaction to terminate.
+     * @return The TerminateTransactionResult.
+     */
+    default TerminateTransactionResult forceTerminateTransaction(String 
transactionalId) {
+        return forceTerminateTransaction(transactionalId, new 
TerminateTransactionOptions());
+    }
+
+    /**
+     * Force terminate a transaction for the given transactional ID.
+     * This operation aborts any ongoing transaction associated with the 
transactional ID.
+     * It's similar to fenceProducers but only targets a single transactional 
ID to handle
+     * long-running transactions when 2PC is enabled.
+     *
+     * @param transactionalId       The ID of the transaction to terminate.
+     * @param options               The options to use when terminating the 
transaction.
+     * @return The TerminateTransactionResult.
+     */
+    TerminateTransactionResult forceTerminateTransaction(String 
transactionalId, 
+                                                        
TerminateTransactionOptions options);
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index f4c7b4c4876..8b7db2f04f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -283,6 +283,11 @@ public class ForwardingAdmin implements Admin {
         return delegate.abortTransaction(spec, options);
     }
 
+    @Override
+    public TerminateTransactionResult forceTerminateTransaction(String 
transactionalId, TerminateTransactionOptions options) {
+        return delegate.forceTerminateTransaction(transactionalId, options);
+    }
+
     @Override
     public ListTransactionsResult listTransactions(ListTransactionsOptions 
options) {
         return delegate.listTransactions(options);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c4dcceae63c..e85dcf21e03 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4839,6 +4839,35 @@ public class KafkaAdminClient extends AdminClient {
         return new AbortTransactionResult(future.all());
     }
 
+    /**
+     * Forcefully terminates an ongoing transaction for a given transactional 
ID.
+     * <p>
+     * This API is intended for well-formed but long-running transactions that 
are known to the
+     * transaction coordinator. It is primarily designed for supporting 2PC 
(two-phase commit) workflows,
+     * where a coordinator may need to unilaterally terminate a participant 
transaction that hasn't completed.
+     * </p>
+     *
+     * @param transactionalId       The transactional ID whose active 
transaction should be forcefully terminated.
+     * @return a {@link TerminateTransactionResult} that can be used to await 
the operation result.
+     */
+    @Override
+    public TerminateTransactionResult forceTerminateTransaction(String 
transactionalId, TerminateTransactionOptions options) {
+        // Simply leverage the existing fenceProducers implementation with a 
single transactional ID
+        FenceProducersOptions fenceOptions = new FenceProducersOptions();
+        if (options.timeoutMs() != null) {
+            fenceOptions.timeoutMs(options.timeoutMs());
+        }
+
+        FenceProducersResult fenceResult = fenceProducers(
+            Collections.singleton(transactionalId),
+            fenceOptions
+        );
+
+        // Convert the result to a TerminateTransactionResult
+        KafkaFuture<Void> future = 
fenceResult.fencedProducers().get(transactionalId);
+        return new TerminateTransactionResult(future);
+    }
+
     @Override
     public ListTransactionsResult listTransactions(ListTransactionsOptions 
options) {
         AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>> 
future =
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionOptions.java
new file mode 100644
index 00000000000..0b8caaee935
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for {@link Admin#forceTerminateTransaction(String, 
TerminateTransactionOptions)}.
+ */
+public class TerminateTransactionOptions extends 
AbstractOptions<TerminateTransactionOptions> {
+
+    @Override
+    public String toString() {
+        return "TerminateTransactionOptions{" +
+                "timeoutMs=" + timeoutMs +
+                '}';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionResult.java
new file mode 100644
index 00000000000..18fee2477be
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/TerminateTransactionResult.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#forceTerminateTransaction(String)} call.
+ */
+public class TerminateTransactionResult {
+
+    private final KafkaFuture<Void> future;
+
+    TerminateTransactionResult(KafkaFuture<Void> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which indicates whether the transaction was 
successfully terminated.
+     */
+    public KafkaFuture<Void> result() {
+        return future;
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 5e3a884742d..ac309e1c4d9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicIdException;
@@ -9746,6 +9747,92 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testForceTerminateTransaction() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            String transactionalId = "testForceTerminate";
+            Node transactionCoordinator = 
env.cluster().nodes().iterator().next();
+
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
+                Errors.NONE,
+                transactionalId,
+                transactionCoordinator
+            ));
+
+            // Complete the init PID request successfully
+            InitProducerIdResponseData initProducerIdResponseData = new 
InitProducerIdResponseData()
+                .setProducerId(5678)
+                .setProducerEpoch((short) 123);
+
+            env.kafkaClient().prepareResponseFrom(request ->
+                request instanceof InitProducerIdRequest,
+                new InitProducerIdResponse(initProducerIdResponseData),
+                transactionCoordinator
+            );
+
+            // Call force terminate and verify results
+            TerminateTransactionResult result = 
env.adminClient().forceTerminateTransaction(transactionalId);
+            assertNull(result.result().get());
+        }
+    }
+
+    @Test
+    public void testForceTerminateTransactionWithError() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            String transactionalId = "testForceTerminateError";
+            Node transactionCoordinator = 
env.cluster().nodes().iterator().next();
+
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
+                Errors.NONE,
+                transactionalId,
+                transactionCoordinator
+            ));
+
+            // Return an error from the InitProducerId request
+            env.kafkaClient().prepareResponseFrom(request ->
+                request instanceof InitProducerIdRequest,
+                new InitProducerIdResponse(new InitProducerIdResponseData()
+                    
.setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code())),
+                transactionCoordinator
+            );
+
+            // Call force terminate and verify error is propagated
+            TerminateTransactionResult result = 
env.adminClient().forceTerminateTransaction(transactionalId);
+            ExecutionException exception = 
assertThrows(ExecutionException.class, () -> result.result().get());
+            assertTrue(exception.getCause() instanceof 
TransactionalIdAuthorizationException);
+        }
+    }
+
+    @Test
+    public void testForceTerminateTransactionWithCustomTimeout() throws 
Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            String transactionalId = "testForceTerminateTimeout";
+            Node transactionCoordinator = 
env.cluster().nodes().iterator().next();
+
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(
+                Errors.NONE,
+                transactionalId,
+                transactionCoordinator
+            ));
+
+            // Complete the init PID request
+            InitProducerIdResponseData initProducerIdResponseData = new 
InitProducerIdResponseData()
+                .setProducerId(9012)
+                .setProducerEpoch((short) 456);
+
+            env.kafkaClient().prepareResponseFrom(request ->
+                request instanceof InitProducerIdRequest,
+                new InitProducerIdResponse(initProducerIdResponseData),
+                transactionCoordinator
+            );
+
+            // Use custom timeout
+            TerminateTransactionOptions options = new 
TerminateTransactionOptions().timeoutMs(10000);
+            TerminateTransactionResult result = 
env.adminClient().forceTerminateTransaction(transactionalId, options);
+            assertNull(result.result().get());
+        }
+    }
+
     @Test
     public void testListTransactions() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 9bd566ac11c..f58548465d5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1382,6 +1382,11 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public TerminateTransactionResult forceTerminateTransaction(String 
transactionalId, TerminateTransactionOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public ListTransactionsResult listTransactions(ListTransactionsOptions 
options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 60a042aa28d..2005da2d2ce 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -141,6 +141,8 @@ import 
org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
 import org.apache.kafka.clients.admin.RemoveRaftVoterResult;
 import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
 import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.clients.admin.TerminateTransactionOptions;
+import org.apache.kafka.clients.admin.TerminateTransactionResult;
 import org.apache.kafka.clients.admin.UnregisterBrokerOptions;
 import org.apache.kafka.clients.admin.UnregisterBrokerResult;
 import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
@@ -416,6 +418,11 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
         return adminDelegate.abortTransaction(spec, options);
     }
 
+    @Override
+    public TerminateTransactionResult forceTerminateTransaction(final String 
transactionalId, final TerminateTransactionOptions options) {
+        return adminDelegate.forceTerminateTransaction(transactionalId, 
options);
+    }
+
     @Override
     public ListTransactionsResult listTransactions(final 
ListTransactionsOptions options) {
         return adminDelegate.listTransactions(options);
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index af9df7769f1..508347c819e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -254,6 +254,42 @@ public abstract class TransactionsCommand {
         }
     }
 
+    static class ForceTerminateTransactionsCommand extends TransactionsCommand 
{
+
+        ForceTerminateTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "forceTerminateTransaction";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .description("Force abort an ongoing transaction on 
transactionalId")
+                .help("Force abort an ongoing transaction on transactionalId 
(requires administrative privileges)");
+
+            subparser.addArgument("--transactionalId")
+                .help("transactional id")
+                .action(store())
+                .type(String.class)
+                .required(true);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            String transactionalId = ns.getString("transactionalId");
+
+            try {
+                
admin.forceTerminateTransaction(transactionalId).result().get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to force terminate transactionalId 
`" + transactionalId + "`", e.getCause());
+            }
+        }
+    }
+
     static class DescribeProducersCommand extends TransactionsCommand {
         static final List<String> HEADERS = asList(
             "ProducerId",
@@ -990,7 +1026,8 @@ public abstract class TransactionsCommand {
             new DescribeTransactionsCommand(time),
             new DescribeProducersCommand(time),
             new AbortTransactionCommand(time),
-            new FindHangingTransactionsCommand(time)
+            new FindHangingTransactionsCommand(time),
+            new ForceTerminateTransactionsCommand(time)
         );
 
         ArgumentParser parser = buildBaseParser();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index efb25d027f0..8785d4f1159 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.ListTransactionsOptions;
 import org.apache.kafka.clients.admin.ListTransactionsResult;
 import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TerminateTransactionResult;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TransactionDescription;
 import org.apache.kafka.clients.admin.TransactionListing;
@@ -239,6 +240,35 @@ public class TransactionsCommandTest {
         assertEquals(expectedRows, new HashSet<>(table.subList(1, 
table.size())));
     }
 
+    @Test
+    public void testForceTerminateTransaction() throws Exception {
+        String transactionalId = "foo";
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "forceTerminateTransaction",
+            "--transactionalId",
+            transactionalId
+        };
+
+        TerminateTransactionResult terminateTransactionResult = 
Mockito.mock(TerminateTransactionResult.class);
+        KafkaFuture<Void> future = KafkaFuture.completedFuture(null);
+        Mockito.when(terminateTransactionResult.result()).thenReturn(future);
+        
Mockito.when(admin.forceTerminateTransaction(transactionalId)).thenReturn(terminateTransactionResult);
+
+        execute(args);
+        assertNormalExit();
+    }
+
+    @Test
+    public void testForceTerminateTransactionTransactionalIdRequired() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "force-terminate"
+        });
+    }
+
     @Test
     public void testDescribeTransactionsTransactionalIdRequired() throws 
Exception {
         assertCommandFailure(new String[]{
@@ -1066,5 +1096,4 @@ public class TransactionsCommandTest {
         assertTrue(exitProcedure.hasExited());
         assertEquals(1, exitProcedure.statusCode());
     }
-
 }

Reply via email to