This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f42abe6db8f KAFKA-19082:[4/4] Complete Txn Client Side Changes (KIP-939) (#19714) f42abe6db8f is described below commit f42abe6db8fb0fc1c668ab77acb8e878205fd32c Author: Ritika Reddy <98577846+rreddy...@users.noreply.github.com> AuthorDate: Thu May 29 09:06:57 2025 -0700 KAFKA-19082:[4/4] Complete Txn Client Side Changes (KIP-939) (#19714) public void completeTransaction(PreparedTxnState preparedTxnState) The method compares the currently prepared transaction state and the state passed in the argument. 1. Commit if the state matches 2. Abort the transaction otherwise. If the producer is not in a prepared state (i.e., neither prepareTransaction was called nor initTransaction(true) was called), we return an INVALID_TXN_STATE error. Reviewers: Justine Olshan <jols...@confluent.io>, Artem Livshits <alivsh...@confluent.io> --- .../kafka/clients/producer/KafkaProducer.java | 34 ++++++++++ .../kafka/clients/producer/MockProducer.java | 21 ++++++ .../apache/kafka/clients/producer/Producer.java | 5 ++ .../producer/internals/TransactionManager.java | 7 +- .../kafka/clients/producer/KafkaProducerTest.java | 77 ++++++++++++++++++++++ 5 files changed, 141 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b44b358dede..71d201b71f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -884,6 +884,40 @@ public class KafkaProducer<K, V> implements Producer<K, V> { producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart); } + /** + * Completes a prepared transaction by comparing the provided prepared transaction state with the + * current prepared state on the producer. + * If they match, the transaction is committed; otherwise, it is aborted. + * + * @param preparedTxnState The prepared transaction state to compare against the current state + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started + * @throws InvalidTxnStateException if the producer is not in prepared state + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for completing the transaction has surpassed <code>max.block.ms</code> + * @throws InterruptException if the thread is interrupted while blocked + */ + @Override + public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException { + throwIfNoTransactionManager(); + throwIfProducerClosed(); + + if (!transactionManager.isPrepared()) { + throw new InvalidTxnStateException("Cannot complete transaction because no transaction has been prepared. " + + "Call prepareTransaction() first, or make sure initTransaction(true) was called."); + } + + // Get the current prepared transaction state + PreparedTxnState currentPreparedState = transactionManager.preparedTransactionState(); + + // Compare the prepared transaction state token and commit or abort accordingly + if (currentPreparedState.equals(preparedTxnState)) { + commitTransaction(); + } else { + abortTransaction(); + } + } + /** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index c6e02d4ab16..3e5cb9f5d5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -257,6 +257,27 @@ public class MockProducer<K, V> implements Producer<K, V> { this.transactionInFlight = false; } + @Override + public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException { + verifyNotClosed(); + verifyNotFenced(); + verifyTransactionsInitialized(); + + if (!this.transactionInFlight) { + throw new IllegalStateException("There is no prepared transaction to complete."); + } + + // For testing purposes, we'll consider a prepared state with producerId=1000L and epoch=1 as valid + // This should match what's returned in prepareTransaction() + PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1); + + if (currentState.equals(preparedTxnState)) { + commitTransaction(); + } else { + abortTransaction(); + } + } + private synchronized void verifyNotClosed() { if (this.closed) { throw new IllegalStateException("MockProducer is already closed."); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index db4460d6b10..e6e94691e34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -77,6 +77,11 @@ public interface Producer<K, V> extends Closeable { */ void abortTransaction() throws ProducerFencedException; + /** + * See {@link KafkaProducer#completeTransaction(PreparedTxnState)} + */ + void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException; + /** * @see KafkaProducer#registerMetricForSubscription(KafkaMetric) */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 061df29fbb2..316164c974c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -913,7 +913,7 @@ public class TransactionManager { log.debug("Not sending EndTxn for completed transaction since no partitions " + "or offsets were successfully added"); } - completeTransaction(); + resetTransactionState(); } nextRequestHandler = pendingRequests.poll(); } @@ -1320,7 +1320,7 @@ public class TransactionManager { return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled; } - private void completeTransaction() { + private void resetTransactionState() { if (clientSideEpochBumpRequired) { transitionTo(State.INITIALIZING); } else { @@ -1332,6 +1332,7 @@ public class TransactionManager { newPartitionsInTransaction.clear(); pendingPartitionsInTransaction.clear(); partitionsInTransaction.clear(); + preparedTxnState = new PreparedTxnState(); } abstract class TxnRequestHandler implements RequestCompletionHandler { @@ -1743,7 +1744,7 @@ public class TransactionManager { setProducerIdAndEpoch(producerIdAndEpoch); resetSequenceNumbers(); } - completeTransaction(); + resetTransactionState(); result.done(); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 2634968a2bd..8460d0f4c5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.producer.internals.ProducerMetadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.TransactionManager; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -1600,6 +1601,82 @@ public class KafkaProducerTest { } } + @Test + public void testCompleteTransactionWithMatchingState() throws Exception { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + when(ctx.transactionManager.isPrepared()).thenReturn(true); + when(ctx.sender.isRunning()).thenReturn(true); + + // Create prepared states with matching values + long producerId = 12345L; + short epoch = 5; + PreparedTxnState currentState = new PreparedTxnState(producerId, epoch); + PreparedTxnState inputState = new PreparedTxnState(producerId, epoch); + + // Set up the transaction manager to return the prepared state + when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState); + + // Should trigger commit when states match + TransactionalRequestResult commitResult = mock(TransactionalRequestResult.class); + when(ctx.transactionManager.beginCommit()).thenReturn(commitResult); + + try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) { + // Call completeTransaction with the matching state + producer.completeTransaction(inputState); + + // Verify methods called in order + verify(ctx.transactionManager).isPrepared(); + verify(ctx.transactionManager).preparedTransactionState(); + verify(ctx.transactionManager).beginCommit(); + + // Verify abort was never called + verify(ctx.transactionManager, never()).beginAbort(); + + // Verify sender was woken up + verify(ctx.sender).wakeup(); + } + } + + @Test + public void testCompleteTransactionWithNonMatchingState() throws Exception { + StringSerializer serializer = new StringSerializer(); + KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer); + + when(ctx.transactionManager.isPrepared()).thenReturn(true); + when(ctx.sender.isRunning()).thenReturn(true); + + // Create txn prepared states with different values + long producerId = 12345L; + short epoch = 5; + PreparedTxnState currentState = new PreparedTxnState(producerId, epoch); + PreparedTxnState inputState = new PreparedTxnState(producerId + 1, epoch); + + // Set up the transaction manager to return the prepared state + when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState); + + // Should trigger abort when states don't match + TransactionalRequestResult abortResult = mock(TransactionalRequestResult.class); + when(ctx.transactionManager.beginAbort()).thenReturn(abortResult); + + try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) { + // Call completeTransaction with the non-matching state + producer.completeTransaction(inputState); + + // Verify methods called in order + verify(ctx.transactionManager).isPrepared(); + verify(ctx.transactionManager).preparedTransactionState(); + verify(ctx.transactionManager).beginAbort(); + + // Verify commit was never called + verify(ctx.transactionManager, never()).beginCommit(); + + // Verify sender was woken up + verify(ctx.sender).wakeup(); + } + } + @Test public void testClusterAuthorizationFailure() throws Exception { int maxBlockMs = 500;