This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a511874a81 MINOR: Use TransactionOperation enum instead of String for 
pending state checks (#21218)
7a511874a81 is described below

commit 7a511874a81fb3660e55892f3873ca8e5987096a
Author: ChickenchickenLove <[email protected]>
AuthorDate: Sun Jan 4 00:31:08 2026 +0900

    MINOR: Use TransactionOperation enum instead of String for pending state 
checks (#21218)
    
    ### PR Description
    this PR refactors `TransactionManager#throwIfPendingState` to use a
    typed `TransactionOperation` enum instead of a raw String.
    
    ###Background / Motivation
    The previous String-based API relies on manually passing operation
    names, which can make it easier for naming to drift or become
    inconsistent over time. Using an enum makes the intent more explicit and
    keeps the operation naming centralized.
    
    ### What’s changed
    - Added a `TransactionOperation` enum with a `displayName` (and
    `toString()` override) to preserve readable exception messages.
    - Updated relevant call sites (`beginTransaction`, `prepareTransaction`,
    `sendOffsetsToTransaction`, `maybeAddPartition`) to pass the
    corresponding enum value.
    - Updated `throwIfPendingState` to accept `TransactionOperation` instead
    of `String`.
    
    ### Behavior / Compatibility
    - No behavioral change is intended. The exception message remains
    human-readable and consistent via `TransactionOperation#toString()`.
    
    ### Testing
    No additional tests were added since this is a small refactor; existing
    tests should continue to cover the pending-transition behavior.
    
    Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../producer/internals/TransactionManager.java     | 28 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 969085809e6..9e30ab1e66f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -205,6 +205,24 @@ public class TransactionManager {
             this.priority = priority;
         }
     }
+    
+    private enum TransactionOperation {
+        SEND("send"),
+        BEGIN_TRANSACTION("beginTransaction"),
+        PREPARE_TRANSACTION("prepareTransaction"),
+        SEND_OFFSETS_TO_TRANSACTION("sendOffsetsToTransaction");
+        
+        final String displayName;
+
+        TransactionOperation(String displayName) {
+            this.displayName = displayName;
+        }
+
+        @Override
+        public String toString() {
+            return displayName;
+        }
+    }
 
     public TransactionManager(final LogContext logContext,
                               final String transactionalId,
@@ -331,7 +349,7 @@ public class TransactionManager {
 
     public synchronized void beginTransaction() {
         ensureTransactional();
-        throwIfPendingState("beginTransaction");
+        throwIfPendingState(TransactionOperation.BEGIN_TRANSACTION);
         maybeFailWithError();
         transitionTo(State.IN_TRANSACTION);
     }
@@ -343,7 +361,7 @@ public class TransactionManager {
      */
     public synchronized void prepareTransaction() {
         ensureTransactional();
-        throwIfPendingState("prepareTransaction");
+        throwIfPendingState(TransactionOperation.PREPARE_TRANSACTION);
         maybeFailWithError();
         transitionTo(State.PREPARED_TRANSACTION);
         this.preparedTxnState = new ProducerIdAndEpoch(
@@ -406,7 +424,7 @@ public class TransactionManager {
     public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                             
final ConsumerGroupMetadata groupMetadata) {
         ensureTransactional();
-        throwIfPendingState("sendOffsetsToTransaction");
+        throwIfPendingState(TransactionOperation.SEND_OFFSETS_TO_TRANSACTION);
         maybeFailWithError();
 
         if (currentState != State.IN_TRANSACTION) {
@@ -438,7 +456,7 @@ public class TransactionManager {
 
     public synchronized void maybeAddPartition(TopicPartition topicPartition) {
         maybeFailWithError();
-        throwIfPendingState("send");
+        throwIfPendingState(TransactionOperation.SEND);
 
         if (isTransactional()) {
             if (!hasProducerId()) {
@@ -1248,7 +1266,7 @@ public class TransactionManager {
         return new TxnOffsetCommitHandler(result, builder);
     }
 
-    private void throwIfPendingState(String operation) {
+    private void throwIfPendingState(TransactionOperation operation) {
         if (pendingTransition != null) {
             if (pendingTransition.result.isAcked()) {
                 pendingTransition = null;

Reply via email to