This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8b4560e3f0f KAFKA-15767 Refactor TransactionManager to avoid use of ThreadLocal (#19440) 8b4560e3f0f is described below commit 8b4560e3f0f8e6cc16fe9c4c6eac95d6ae9b7c51 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Wed Apr 23 09:31:30 2025 -0700 KAFKA-15767 Refactor TransactionManager to avoid use of ThreadLocal (#19440) Introduces a concrete subclass of `KafkaThread` named `SenderThread`. The poisoning of the TransactionManager on invalid state changes is determined by looking at the type of the current thread. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/clients/producer/KafkaProducer.java | 7 +- .../kafka/clients/producer/internals/Sender.java | 10 +- .../producer/internals/TransactionManager.java | 112 ++++++++++----------- .../kafka/clients/producer/KafkaProducerTest.java | 5 +- .../producer/internals/TransactionManagerTest.java | 35 ++++++- 5 files changed, 99 insertions(+), 70 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 16512c42d5f..baaf13388d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -75,7 +75,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -256,7 +255,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { private final ProducerMetadata metadata; private final RecordAccumulator accumulator; private final Sender sender; - private final Thread ioThread; + private final Sender.SenderThread ioThread; private final Compression compression; private final Sensor errors; private final Time time; @@ -454,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.errors = this.metrics.sensor("errors"); this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; - this.ioThread = new KafkaThread(ioThreadName, this.sender, true); + this.ioThread = new Sender.SenderThread(ioThreadName, this.sender, true); this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); @@ -480,7 +479,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ProducerInterceptors<K, V> interceptors, Partitioner partitioner, Time time, - KafkaThread ioThread, + Sender.SenderThread ioThread, Optional<ClientTelemetryReporter> clientTelemetryReporter) { this.producerConfig = config; this.time = time; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 614fe562d87..6739facfc34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -234,9 +235,6 @@ public class Sender implements Runnable { public void run() { log.debug("Starting Kafka producer I/O thread."); - if (transactionManager != null) - transactionManager.setPoisonStateOnInvalidTransition(true); - // main loop, runs until close is called while (running) { try { @@ -1072,4 +1070,10 @@ public class Sender implements Runnable { } } + public static class SenderThread extends KafkaThread { + + public SenderThread(final String name, Runnable runnable, boolean daemon) { + super(name, runnable, daemon); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ec..b52d5d4836d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -120,58 +120,6 @@ public class TransactionManager { private final Set<TopicPartition> newPartitionsInTransaction; private final Set<TopicPartition> pendingPartitionsInTransaction; private final Set<TopicPartition> partitionsInTransaction; - - /** - * During its normal course of operations, the transaction manager transitions through different internal - * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions - * result from actions on one of the following classes of threads: - * - * <ul> - * <li><em>Application</em> threads that invokes {@link Producer} API calls</li> - * <li><em>{@link Sender}</em> thread operations</li> - * </ul> - * - * When an invalid state transition is detected during execution on an <em>application</em> thread, the - * {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the - * application the opportunity to fix the issue without permanently poisoning the state of the - * transaction manager. The {@link Producer} API calls that perform a state transition include: - * - * <ul> - * <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li> - * <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li> - * <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li> - * <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()} - * </li> - * <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls - * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} - * </li> - * <li>{@link Producer#send(ProducerRecord)} (and its variants) calls - * {@link #maybeAddPartition(TopicPartition)} and - * {@link #maybeTransitionToErrorState(RuntimeException)} - * </li> - * </ul> - * - * <p/> - * - * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the - * <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an - * invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an - * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its - * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. - * - * <p/> - * - * It's important to prevent possible corruption when the transaction manager has determined that it is in a - * fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the - * <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the - * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated - * transactional guarantees are not violated. - * - * <p/> - * - * See KAFKA-14831 for more detail. - */ - private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition; private PendingStateTransition pendingTransition; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. @@ -265,7 +213,6 @@ public class TransactionManager { this.newPartitionsInTransaction = new HashSet<>(); this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); - this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); this.pendingTxnOffsetCommits = new HashMap<>(); this.partitionsWithUnresolvedSequences = new HashMap<>(); @@ -275,8 +222,61 @@ public class TransactionManager { this.apiVersions = apiVersions; } - void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { - shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); + /** + * During its normal course of operations, the transaction manager transitions through different internal + * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions + * result from actions on one of the following classes of threads: + * + * <ul> + * <li><em>Application</em> threads that invokes {@link Producer} API calls</li> + * <li><em>{@link Sender}</em> thread operations</li> + * </ul> + * + * When an invalid state transition is detected during execution on an <em>application</em> thread, the + * {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the + * application the opportunity to fix the issue without permanently poisoning the state of the + * transaction manager. The {@link Producer} API calls that perform a state transition include: + * + * <ul> + * <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li> + * <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li> + * <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li> + * <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()} + * </li> + * <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls + * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} + * </li> + * <li>{@link Producer#send(ProducerRecord)} (and its variants) calls + * {@link #maybeAddPartition(TopicPartition)} and + * {@link #maybeTransitionToErrorState(RuntimeException)} + * </li> + * </ul> + * + * <p/> + * + * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the + * <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an + * invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an + * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its + * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. + * + * <p/> + * + * It's important to prevent possible corruption when the transaction manager has determined that it is in a + * fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the + * <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the + * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated + * transactional guarantees are not violated. + * + * <p/> + * + * See KAFKA-14831 for more detail. + * + * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, + * {@code false} to throw an exception without first changing the state + */ + protected boolean shouldPoisonStateOnInvalidTransition() { + return Thread.currentThread() instanceof Sender.SenderThread; } public synchronized TransactionalRequestResult initializeTransactions() { @@ -1063,7 +1063,7 @@ public class TransactionManager { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (shouldPoisonStateOnInvalidTransition.get()) { + if (shouldPoisonStateOnInvalidTransition()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index fb2f4f01282..48569f1a20c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -87,7 +87,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -2592,7 +2591,7 @@ public class KafkaProducerTest { private final Map<String, Object> configs; private final Serializer<T> serializer; private final Partitioner partitioner = mock(Partitioner.class); - private final KafkaThread ioThread = mock(KafkaThread.class); + private final Sender.SenderThread senderThread = mock(Sender.SenderThread.class); private final List<ProducerInterceptor<T, T>> interceptors = new ArrayList<>(); private ProducerMetadata metadata = mock(ProducerMetadata.class); private RecordAccumulator accumulator = mock(RecordAccumulator.class); @@ -2673,7 +2672,7 @@ public class KafkaProducerTest { interceptors, partitioner, time, - ioThread, + senderThread, Optional.empty() ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8b4decfb959..5520df03467 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -149,7 +149,7 @@ public class TransactionManagerTest { private RecordAccumulator accumulator = null; private Sender sender = null; - private TransactionManager transactionManager = null; + private TestableTransactionManager transactionManager = null; private Node brokerNode = null; private long finalizedFeaturesEpoch = 0; @@ -188,7 +188,7 @@ public class TransactionManagerTest { .setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1)), finalizedFeaturesEpoch)); finalizedFeaturesEpoch += 1; - this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), + this.transactionManager = new TestableTransactionManager(logContext, transactionalId.orElse(null), transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); int batchSize = 16 * 1024; @@ -1038,7 +1038,7 @@ public class TransactionManagerTest { .setMaxVersionLevel((short) 1) .setMinVersionLevel((short) 1)), 0)); - this.transactionManager = new TransactionManager(logContext, transactionalId, + this.transactionManager = new TestableTransactionManager(logContext, transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); int batchSize = 16 * 1024; @@ -3802,7 +3802,7 @@ public class TransactionManagerTest { doInitTransactions(); assertTrue(transactionManager.isTransactional()); - transactionManager.setPoisonStateOnInvalidTransition(true); + transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true); // Intentionally perform an operation that will cause an invalid state transition. The detection of this // will result in a poisoning of the transaction manager for all subsequent transactional operations since @@ -4373,4 +4373,31 @@ public class TransactionManagerTest { ProducerTestUtils.runUntil(sender, condition); } + /** + * This subclass exists only to optionally change the default behavior related to poisoning the state + * on invalid state transition attempts. + */ + private static class TestableTransactionManager extends TransactionManager { + + private Optional<Boolean> shouldPoisonStateOnInvalidTransitionOverride; + + public TestableTransactionManager(LogContext logContext, + String transactionalId, + int transactionTimeoutMs, + long retryBackoffMs, + ApiVersions apiVersions) { + super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions); + this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty(); + } + + private void setShouldPoisonStateOnInvalidTransitionOverride(boolean override) { + shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override); + } + + @Override + protected boolean shouldPoisonStateOnInvalidTransition() { + // If there's an override, use it, otherwise invoke the default (i.e. super class) logic. + return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition); + } + } }