This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f42abe6db8f KAFKA-19082:[4/4] Complete Txn Client Side Changes 
(KIP-939) (#19714)
f42abe6db8f is described below

commit f42abe6db8fb0fc1c668ab77acb8e878205fd32c
Author: Ritika Reddy <98577846+rreddy...@users.noreply.github.com>
AuthorDate: Thu May 29 09:06:57 2025 -0700

    KAFKA-19082:[4/4] Complete Txn Client Side Changes (KIP-939) (#19714)
    
    public void completeTransaction(PreparedTxnState preparedTxnState)
    
    The method compares the currently prepared transaction state and the
    state passed in the argument.
    
    1.  Commit if the state matches
    2. Abort the transaction otherwise.
    
    If the producer is not in a prepared state (i.e., neither
    prepareTransaction was called nor initTransaction(true) was called), we
    return an INVALID_TXN_STATE error.
    
    Reviewers: Justine Olshan <jols...@confluent.io>, Artem Livshits
     <alivsh...@confluent.io>
---
 .../kafka/clients/producer/KafkaProducer.java      | 34 ++++++++++
 .../kafka/clients/producer/MockProducer.java       | 21 ++++++
 .../apache/kafka/clients/producer/Producer.java    |  5 ++
 .../producer/internals/TransactionManager.java     |  7 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 77 ++++++++++++++++++++++
 5 files changed, 141 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b44b358dede..71d201b71f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -884,6 +884,40 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
     }
 
+    /**
+     * Completes a prepared transaction by comparing the provided prepared 
transaction state with the
+     * current prepared state on the producer.
+     * If they match, the transaction is committed; otherwise, it is aborted.
+     * 
+     * @param preparedTxnState              The prepared transaction state to 
compare against the current state
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started
+     * @throws InvalidTxnStateException if the producer is not in prepared 
state
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws TimeoutException if the time taken for completing the 
transaction has surpassed <code>max.block.ms</code>
+     * @throws InterruptException if the thread is interrupted while blocked
+     */
+    @Override
+    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
+        throwIfNoTransactionManager();
+        throwIfProducerClosed();
+        
+        if (!transactionManager.isPrepared()) {
+            throw new InvalidTxnStateException("Cannot complete transaction 
because no transaction has been prepared. " +
+                "Call prepareTransaction() first, or make sure 
initTransaction(true) was called.");
+        }
+        
+        // Get the current prepared transaction state
+        PreparedTxnState currentPreparedState = 
transactionManager.preparedTransactionState();
+        
+        // Compare the prepared transaction state token and commit or abort 
accordingly
+        if (currentPreparedState.equals(preparedTxnState)) {
+            commitTransaction();
+        } else {
+            abortTransaction();
+        }
+    }
+
     /**
      * Asynchronously send a record to a topic. Equivalent to 
<code>send(record, null)</code>.
      * See {@link #send(ProducerRecord, Callback)} for details.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index c6e02d4ab16..3e5cb9f5d5a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -257,6 +257,27 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.transactionInFlight = false;
     }
 
+    @Override
+    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
+        verifyNotClosed();
+        verifyNotFenced();
+        verifyTransactionsInitialized();
+        
+        if (!this.transactionInFlight) {
+            throw new IllegalStateException("There is no prepared transaction 
to complete.");
+        }
+
+        // For testing purposes, we'll consider a prepared state with 
producerId=1000L and epoch=1 as valid
+        // This should match what's returned in prepareTransaction()
+        PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);
+        
+        if (currentState.equals(preparedTxnState)) {
+            commitTransaction();
+        } else {
+            abortTransaction();
+        }
+    }
+
     private synchronized void verifyNotClosed() {
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index db4460d6b10..e6e94691e34 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -77,6 +77,11 @@ public interface Producer<K, V> extends Closeable {
      */
     void abortTransaction() throws ProducerFencedException;
 
+    /**
+     * See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
+     */
+    void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException;
+
     /**
      * @see KafkaProducer#registerMetricForSubscription(KafkaMetric) 
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 061df29fbb2..316164c974c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -913,7 +913,7 @@ public class TransactionManager {
                     log.debug("Not sending EndTxn for completed transaction 
since no partitions " +
                             "or offsets were successfully added");
                 }
-                completeTransaction();
+                resetTransactionState();
             }
             nextRequestHandler = pendingRequests.poll();
         }
@@ -1320,7 +1320,7 @@ public class TransactionManager {
         return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
     }
 
-    private void completeTransaction() {
+    private void resetTransactionState() {
         if (clientSideEpochBumpRequired) {
             transitionTo(State.INITIALIZING);
         } else {
@@ -1332,6 +1332,7 @@ public class TransactionManager {
         newPartitionsInTransaction.clear();
         pendingPartitionsInTransaction.clear();
         partitionsInTransaction.clear();
+        preparedTxnState = new PreparedTxnState();
     }
 
     abstract class TxnRequestHandler implements RequestCompletionHandler {
@@ -1743,7 +1744,7 @@ public class TransactionManager {
                     setProducerIdAndEpoch(producerIdAndEpoch);
                     resetSequenceNumbers();
                 }
-                completeTransaction();
+                resetTransactionState();
                 result.done();
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == 
Errors.NOT_COORDINATOR) {
                 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2634968a2bd..8460d0f4c5f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.clients.producer.internals.ProducerMetadata;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -1600,6 +1601,82 @@ public class KafkaProducerTest {
         }
     }
     
+    @Test
+    public void testCompleteTransactionWithMatchingState() throws Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        when(ctx.transactionManager.isPrepared()).thenReturn(true);
+        when(ctx.sender.isRunning()).thenReturn(true);
+        
+        // Create prepared states with matching values
+        long producerId = 12345L;
+        short epoch = 5;
+        PreparedTxnState currentState = new PreparedTxnState(producerId, 
epoch);
+        PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
+        
+        // Set up the transaction manager to return the prepared state
+        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
+        
+        // Should trigger commit when states match
+        TransactionalRequestResult commitResult = 
mock(TransactionalRequestResult.class);
+        when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
+        
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            // Call completeTransaction with the matching state
+            producer.completeTransaction(inputState);
+            
+            // Verify methods called in order
+            verify(ctx.transactionManager).isPrepared();
+            verify(ctx.transactionManager).preparedTransactionState();
+            verify(ctx.transactionManager).beginCommit();
+            
+            // Verify abort was never called
+            verify(ctx.transactionManager, never()).beginAbort();
+            
+            // Verify sender was woken up
+            verify(ctx.sender).wakeup();
+        }
+    }
+    
+    @Test
+    public void testCompleteTransactionWithNonMatchingState() throws Exception 
{
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        when(ctx.transactionManager.isPrepared()).thenReturn(true);
+        when(ctx.sender.isRunning()).thenReturn(true);
+        
+        // Create txn prepared states with different values
+        long producerId = 12345L;
+        short epoch = 5;
+        PreparedTxnState currentState = new PreparedTxnState(producerId, 
epoch);
+        PreparedTxnState inputState = new PreparedTxnState(producerId + 1, 
epoch);
+        
+        // Set up the transaction manager to return the prepared state
+        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
+        
+        // Should trigger abort when states don't match
+        TransactionalRequestResult abortResult = 
mock(TransactionalRequestResult.class);
+        when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
+        
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            // Call completeTransaction with the non-matching state
+            producer.completeTransaction(inputState);
+            
+            // Verify methods called in order
+            verify(ctx.transactionManager).isPrepared();
+            verify(ctx.transactionManager).preparedTransactionState();
+            verify(ctx.transactionManager).beginAbort();
+            
+            // Verify commit was never called
+            verify(ctx.transactionManager, never()).beginCommit();
+            
+            // Verify sender was woken up
+            verify(ctx.sender).wakeup();
+        }
+    }
+    
     @Test
     public void testClusterAuthorizationFailure() throws Exception {
         int maxBlockMs = 500;

Reply via email to