This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a511874a81 MINOR: Use TransactionOperation enum instead of String for
pending state checks (#21218)
7a511874a81 is described below
commit 7a511874a81fb3660e55892f3873ca8e5987096a
Author: ChickenchickenLove <[email protected]>
AuthorDate: Sun Jan 4 00:31:08 2026 +0900
MINOR: Use TransactionOperation enum instead of String for pending state
checks (#21218)
### PR Description
this PR refactors `TransactionManager#throwIfPendingState` to use a
typed `TransactionOperation` enum instead of a raw String.
###Background / Motivation
The previous String-based API relies on manually passing operation
names, which can make it easier for naming to drift or become
inconsistent over time. Using an enum makes the intent more explicit and
keeps the operation naming centralized.
### What’s changed
- Added a `TransactionOperation` enum with a `displayName` (and
`toString()` override) to preserve readable exception messages.
- Updated relevant call sites (`beginTransaction`, `prepareTransaction`,
`sendOffsetsToTransaction`, `maybeAddPartition`) to pass the
corresponding enum value.
- Updated `throwIfPendingState` to accept `TransactionOperation` instead
of `String`.
### Behavior / Compatibility
- No behavioral change is intended. The exception message remains
human-readable and consistent via `TransactionOperation#toString()`.
### Testing
No additional tests were added since this is a small refactor; existing
tests should continue to cover the pending-transition behavior.
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../producer/internals/TransactionManager.java | 28 ++++++++++++++++++----
1 file changed, 23 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 969085809e6..9e30ab1e66f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -205,6 +205,24 @@ public class TransactionManager {
this.priority = priority;
}
}
+
+ private enum TransactionOperation {
+ SEND("send"),
+ BEGIN_TRANSACTION("beginTransaction"),
+ PREPARE_TRANSACTION("prepareTransaction"),
+ SEND_OFFSETS_TO_TRANSACTION("sendOffsetsToTransaction");
+
+ final String displayName;
+
+ TransactionOperation(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public String toString() {
+ return displayName;
+ }
+ }
public TransactionManager(final LogContext logContext,
final String transactionalId,
@@ -331,7 +349,7 @@ public class TransactionManager {
public synchronized void beginTransaction() {
ensureTransactional();
- throwIfPendingState("beginTransaction");
+ throwIfPendingState(TransactionOperation.BEGIN_TRANSACTION);
maybeFailWithError();
transitionTo(State.IN_TRANSACTION);
}
@@ -343,7 +361,7 @@ public class TransactionManager {
*/
public synchronized void prepareTransaction() {
ensureTransactional();
- throwIfPendingState("prepareTransaction");
+ throwIfPendingState(TransactionOperation.PREPARE_TRANSACTION);
maybeFailWithError();
transitionTo(State.PREPARED_TRANSACTION);
this.preparedTxnState = new ProducerIdAndEpoch(
@@ -406,7 +424,7 @@ public class TransactionManager {
public synchronized TransactionalRequestResult
sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
final ConsumerGroupMetadata groupMetadata) {
ensureTransactional();
- throwIfPendingState("sendOffsetsToTransaction");
+ throwIfPendingState(TransactionOperation.SEND_OFFSETS_TO_TRANSACTION);
maybeFailWithError();
if (currentState != State.IN_TRANSACTION) {
@@ -438,7 +456,7 @@ public class TransactionManager {
public synchronized void maybeAddPartition(TopicPartition topicPartition) {
maybeFailWithError();
- throwIfPendingState("send");
+ throwIfPendingState(TransactionOperation.SEND);
if (isTransactional()) {
if (!hasProducerId()) {
@@ -1248,7 +1266,7 @@ public class TransactionManager {
return new TxnOffsetCommitHandler(result, builder);
}
- private void throwIfPendingState(String operation) {
+ private void throwIfPendingState(TransactionOperation operation) {
if (pendingTransition != null) {
if (pendingTransition.result.isAcked()) {
pendingTransition = null;