This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8b4560e3f0f KAFKA-15767 Refactor TransactionManager to avoid use of 
ThreadLocal (#19440)
8b4560e3f0f is described below

commit 8b4560e3f0f8e6cc16fe9c4c6eac95d6ae9b7c51
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Wed Apr 23 09:31:30 2025 -0700

    KAFKA-15767 Refactor TransactionManager to avoid use of ThreadLocal (#19440)
    
    Introduces a concrete subclass of `KafkaThread` named `SenderThread`.
    The poisoning of the TransactionManager on invalid state changes is
    determined by looking at the type of the current thread.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |   7 +-
 .../kafka/clients/producer/internals/Sender.java   |  10 +-
 .../producer/internals/TransactionManager.java     | 112 ++++++++++-----------
 .../kafka/clients/producer/KafkaProducerTest.java  |   5 +-
 .../producer/internals/TransactionManagerTest.java |  35 ++++++-
 5 files changed, 99 insertions(+), 70 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 16512c42d5f..baaf13388d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -75,7 +75,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -256,7 +255,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final ProducerMetadata metadata;
     private final RecordAccumulator accumulator;
     private final Sender sender;
-    private final Thread ioThread;
+    private final Sender.SenderThread ioThread;
     private final Compression compression;
     private final Sensor errors;
     private final Time time;
@@ -454,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.errors = this.metrics.sensor("errors");
             this.sender = newSender(logContext, kafkaClient, this.metadata);
             String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
-            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
+            this.ioThread = new Sender.SenderThread(ioThreadName, this.sender, 
true);
             this.ioThread.start();
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, 
time.milliseconds());
@@ -480,7 +479,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                   ProducerInterceptors<K, V> interceptors,
                   Partitioner partitioner,
                   Time time,
-                  KafkaThread ioThread,
+                  Sender.SenderThread ioThread,
                   Optional<ClientTelemetryReporter> clientTelemetryReporter) {
         this.producerConfig = config;
         this.time = time;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 614fe562d87..6739facfc34 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -52,6 +52,7 @@ import 
org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 
@@ -234,9 +235,6 @@ public class Sender implements Runnable {
     public void run() {
         log.debug("Starting Kafka producer I/O thread.");
 
-        if (transactionManager != null)
-            transactionManager.setPoisonStateOnInvalidTransition(true);
-
         // main loop, runs until close is called
         while (running) {
             try {
@@ -1072,4 +1070,10 @@ public class Sender implements Runnable {
         }
     }
 
+    public static class SenderThread extends KafkaThread {
+
+        public SenderThread(final String name, Runnable runnable, boolean 
daemon) {
+            super(name, runnable, daemon);
+        }
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c78134c72ec..b52d5d4836d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -120,58 +120,6 @@ public class TransactionManager {
     private final Set<TopicPartition> newPartitionsInTransaction;
     private final Set<TopicPartition> pendingPartitionsInTransaction;
     private final Set<TopicPartition> partitionsInTransaction;
-
-    /**
-     * During its normal course of operations, the transaction manager 
transitions through different internal
-     * states (i.e. by updating {@link #currentState}) to one of those defined 
in {@link State}. These state transitions
-     * result from actions on one of the following classes of threads:
-     *
-     * <ul>
-     *     <li><em>Application</em> threads that invokes {@link Producer} API 
calls</li>
-     *     <li><em>{@link Sender}</em> thread operations</li>
-     * </ul>
-     *
-     * When an invalid state transition is detected during execution on an 
<em>application</em> thread, the
-     * {@link #currentState} is <em>not updated</em> and an {@link 
IllegalStateException} is thrown. This gives the
-     * application the opportunity to fix the issue without permanently 
poisoning the state of the
-     * transaction manager. The {@link Producer} API calls that perform a 
state transition include:
-     *
-     * <ul>
-     *     <li>{@link Producer#initTransactions()} calls {@link 
#initializeTransactions()}</li>
-     *     <li>{@link Producer#beginTransaction()} calls {@link 
#beginTransaction()}</li>
-     *     <li>{@link Producer#commitTransaction()}} calls {@link 
#beginCommit()}</li>
-     *     <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
-     *     </li>
-     *     <li>{@link Producer#sendOffsetsToTransaction(Map, 
ConsumerGroupMetadata)} calls
-     *         {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
-     *     </li>
-     *     <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
-     *         {@link #maybeAddPartition(TopicPartition)} and
-     *         {@link #maybeTransitionToErrorState(RuntimeException)}
-     *     </li>
-     * </ul>
-     *
-     * <p/>
-     *
-     * The {@link Producer} is implemented such that much of its work 
delegated to and performed asynchronously on the
-     * <em>{@link Sender}</em> thread. This includes record batching, network 
I/O, broker response handlers, etc. If an
-     * invalid state transition is detected in the <em>{@link Sender}</em> 
thread, in addition to throwing an
-     * {@link IllegalStateException}, the transaction manager intentionally 
"poisons" itself by setting its
-     * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which 
it cannot recover.
-     *
-     * <p/>
-     *
-     * It's important to prevent possible corruption when the transaction 
manager has determined that it is in a
-     * fatal state. Subsequent transaction operations attempted via either the 
<em>application</em> or the
-     * <em>{@link Sender}</em> thread should fail. This is achieved when these 
operations invoke the
-     * {@link #maybeFailWithError()} method, as it causes a {@link 
KafkaException} to be thrown, ensuring the stated
-     * transactional guarantees are not violated.
-     *
-     * <p/>
-     *
-     * See KAFKA-14831 for more detail.
-     */
-    private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition;
     private PendingStateTransition pendingTransition;
 
     // This is used by the TxnRequestHandlers to control how long to back off 
before a given request is retried.
@@ -265,7 +213,6 @@ public class TransactionManager {
         this.newPartitionsInTransaction = new HashSet<>();
         this.pendingPartitionsInTransaction = new HashSet<>();
         this.partitionsInTransaction = new HashSet<>();
-        this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() 
-> false);
         this.pendingRequests = new PriorityQueue<>(10, 
Comparator.comparingInt(o -> o.priority().priority));
         this.pendingTxnOffsetCommits = new HashMap<>();
         this.partitionsWithUnresolvedSequences = new HashMap<>();
@@ -275,8 +222,61 @@ public class TransactionManager {
         this.apiVersions = apiVersions;
     }
 
-    void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
-        shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
+    /**
+     * During its normal course of operations, the transaction manager 
transitions through different internal
+     * states (i.e. by updating {@link #currentState}) to one of those defined 
in {@link State}. These state transitions
+     * result from actions on one of the following classes of threads:
+     *
+     * <ul>
+     *     <li><em>Application</em> threads that invokes {@link Producer} API 
calls</li>
+     *     <li><em>{@link Sender}</em> thread operations</li>
+     * </ul>
+     *
+     * When an invalid state transition is detected during execution on an 
<em>application</em> thread, the
+     * {@link #currentState} is <em>not updated</em> and an {@link 
IllegalStateException} is thrown. This gives the
+     * application the opportunity to fix the issue without permanently 
poisoning the state of the
+     * transaction manager. The {@link Producer} API calls that perform a 
state transition include:
+     *
+     * <ul>
+     *     <li>{@link Producer#initTransactions()} calls {@link 
#initializeTransactions()}</li>
+     *     <li>{@link Producer#beginTransaction()} calls {@link 
#beginTransaction()}</li>
+     *     <li>{@link Producer#commitTransaction()}} calls {@link 
#beginCommit()}</li>
+     *     <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
+     *     </li>
+     *     <li>{@link Producer#sendOffsetsToTransaction(Map, 
ConsumerGroupMetadata)} calls
+     *         {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
+     *     </li>
+     *     <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
+     *         {@link #maybeAddPartition(TopicPartition)} and
+     *         {@link #maybeTransitionToErrorState(RuntimeException)}
+     *     </li>
+     * </ul>
+     *
+     * <p/>
+     *
+     * The {@link Producer} is implemented such that much of its work 
delegated to and performed asynchronously on the
+     * <em>{@link Sender}</em> thread. This includes record batching, network 
I/O, broker response handlers, etc. If an
+     * invalid state transition is detected in the <em>{@link Sender}</em> 
thread, in addition to throwing an
+     * {@link IllegalStateException}, the transaction manager intentionally 
"poisons" itself by setting its
+     * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which 
it cannot recover.
+     *
+     * <p/>
+     *
+     * It's important to prevent possible corruption when the transaction 
manager has determined that it is in a
+     * fatal state. Subsequent transaction operations attempted via either the 
<em>application</em> or the
+     * <em>{@link Sender}</em> thread should fail. This is achieved when these 
operations invoke the
+     * {@link #maybeFailWithError()} method, as it causes a {@link 
KafkaException} to be thrown, ensuring the stated
+     * transactional guarantees are not violated.
+     *
+     * <p/>
+     *
+     * See KAFKA-14831 for more detail.
+     *
+     * @return {@code true} to set state to {@link State#FATAL_ERROR} before 
throwing an exception,
+     *         {@code false} to throw an exception without first changing the 
state
+     */
+    protected boolean shouldPoisonStateOnInvalidTransition() {
+        return Thread.currentThread() instanceof Sender.SenderThread;
     }
 
     public synchronized TransactionalRequestResult initializeTransactions() {
@@ -1063,7 +1063,7 @@ public class TransactionManager {
             String message = idString + "Invalid transition attempted from 
state "
                     + currentState.name() + " to state " + target.name();
 
-            if (shouldPoisonStateOnInvalidTransition.get()) {
+            if (shouldPoisonStateOnInvalidTransition()) {
                 currentState = State.FATAL_ERROR;
                 lastError = new IllegalStateException(message);
                 throw lastError;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index fb2f4f01282..48569f1a20c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -87,7 +87,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
-import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -2592,7 +2591,7 @@ public class KafkaProducerTest {
         private final Map<String, Object> configs;
         private final Serializer<T> serializer;
         private final Partitioner partitioner = mock(Partitioner.class);
-        private final KafkaThread ioThread = mock(KafkaThread.class);
+        private final Sender.SenderThread senderThread = 
mock(Sender.SenderThread.class);
         private final List<ProducerInterceptor<T, T>> interceptors = new 
ArrayList<>();
         private ProducerMetadata metadata = mock(ProducerMetadata.class);
         private RecordAccumulator accumulator = mock(RecordAccumulator.class);
@@ -2673,7 +2672,7 @@ public class KafkaProducerTest {
                 interceptors,
                 partitioner,
                 time,
-                ioThread,
+                senderThread,
                 Optional.empty()
             );
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8b4decfb959..5520df03467 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -149,7 +149,7 @@ public class TransactionManagerTest {
 
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
-    private TransactionManager transactionManager = null;
+    private TestableTransactionManager transactionManager = null;
     private Node brokerNode = null;
     private long finalizedFeaturesEpoch = 0;
 
@@ -188,7 +188,7 @@ public class TransactionManagerTest {
                 .setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 
1)),
             finalizedFeaturesEpoch));
         finalizedFeaturesEpoch += 1;
-        this.transactionManager = new TransactionManager(logContext, 
transactionalId.orElse(null),
+        this.transactionManager = new TestableTransactionManager(logContext, 
transactionalId.orElse(null),
                 transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
 
         int batchSize = 16 * 1024;
@@ -1038,7 +1038,7 @@ public class TransactionManagerTest {
                 .setMaxVersionLevel((short) 1)
                 .setMinVersionLevel((short) 1)),
             0));
-        this.transactionManager = new TransactionManager(logContext, 
transactionalId,
+        this.transactionManager = new TestableTransactionManager(logContext, 
transactionalId,
             transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
 
         int batchSize = 16 * 1024;
@@ -3802,7 +3802,7 @@ public class TransactionManagerTest {
         doInitTransactions();
         assertTrue(transactionManager.isTransactional());
 
-        transactionManager.setPoisonStateOnInvalidTransition(true);
+        
transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true);
 
         // Intentionally perform an operation that will cause an invalid state 
transition. The detection of this
         // will result in a poisoning of the transaction manager for all 
subsequent transactional operations since
@@ -4373,4 +4373,31 @@ public class TransactionManagerTest {
         ProducerTestUtils.runUntil(sender, condition);
     }
 
+    /**
+     * This subclass exists only to optionally change the default behavior 
related to poisoning the state
+     * on invalid state transition attempts.
+     */
+    private static class TestableTransactionManager extends TransactionManager 
{
+
+        private Optional<Boolean> shouldPoisonStateOnInvalidTransitionOverride;
+
+        public TestableTransactionManager(LogContext logContext,
+                                          String transactionalId,
+                                          int transactionTimeoutMs,
+                                          long retryBackoffMs,
+                                          ApiVersions apiVersions) {
+            super(logContext, transactionalId, transactionTimeoutMs, 
retryBackoffMs, apiVersions);
+            this.shouldPoisonStateOnInvalidTransitionOverride = 
Optional.empty();
+        }
+
+        private void setShouldPoisonStateOnInvalidTransitionOverride(boolean 
override) {
+            shouldPoisonStateOnInvalidTransitionOverride = 
Optional.of(override);
+        }
+
+        @Override
+        protected boolean shouldPoisonStateOnInvalidTransition() {
+            // If there's an override, use it, otherwise invoke the default 
(i.e. super class) logic.
+            return 
shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition);
+        }
+    }
 }

Reply via email to