This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a81f35c1c8f KAFKA-14831: Illegal state errors should be fatal in 
transactional producer (#13591)
a81f35c1c8f is described below

commit a81f35c1c8f9dc594aa585618c36f92ade0f86e2
Author: Kirk True <[email protected]>
AuthorDate: Thu Jun 29 11:21:15 2023 -0700

    KAFKA-14831: Illegal state errors should be fatal in transactional producer 
(#13591)
    
    Poison the transaction manager if we detect an illegal transition in the 
Sender thread. A ThreadLocal in is stored in TransactionManager so that the 
Sender can inform TransactionManager which thread it's using.
    
    Reviewers: Daniel Urban <[email protected]>, Justine Olshan 
<[email protected]>, Jason Gustafson <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../kafka/clients/producer/internals/Sender.java   | 20 +++++-
 .../producer/internals/TransactionManager.java     | 82 ++++++++++++++++++++--
 .../producer/internals/TransactionManagerTest.java | 53 ++++++++++++++
 4 files changed, 150 insertions(+), 7 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 185c8dd283d..d7e36890cdd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -86,7 +86,7 @@
               
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcherUtils|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
+              
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
 
     <suppress checks="JavaNCSS"
               
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1cf141c06e2..15c4be8c7b9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -238,6 +238,9 @@ public class Sender implements Runnable {
     public void run() {
         log.debug("Starting Kafka producer I/O thread.");
 
+        if (transactionManager != null)
+            transactionManager.setPoisonStateOnInvalidTransition(true);
+
         // main loop, runs until close is called
         while (running) {
             try {
@@ -264,7 +267,14 @@ public class Sender implements Runnable {
         while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
             if (!transactionManager.isCompleting()) {
                 log.info("Aborting incomplete transaction due to shutdown");
-                transactionManager.beginAbort();
+
+                try {
+                    // It is possible for the transaction manager to throw 
errors when aborting. Catch these
+                    // so as not to interfere with the rest of the shutdown 
logic.
+                    transactionManager.beginAbort();
+                } catch (Exception e) {
+                    log.error("Error in kafka producer I/O thread while 
aborting transaction: ", e);
+                }
             }
             try {
                 runOnce();
@@ -784,7 +794,13 @@ public class Sender implements Runnable {
         boolean adjustSequenceNumbers
     ) {
         if (transactionManager != null) {
-            transactionManager.handleFailedBatch(batch, topLevelException, 
adjustSequenceNumbers);
+            try {
+                // This call can throw an exception in the rare case that 
there's an invalid state transition
+                // attempted. Catch these so as not to interfere with the rest 
of the logic.
+                transactionManager.handleFailedBatch(batch, topLevelException, 
adjustSequenceNumbers);
+            } catch (Exception e) {
+                log.debug("Encountered error when handling a failed batch", e);
+            }
         }
 
         this.sensors.recordErrors(batch.topicPartition.topic(), 
batch.recordCount);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 32bb4779160..e8ffb97fa71 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -120,6 +122,58 @@ public class TransactionManager {
     private final Set<TopicPartition> newPartitionsInTransaction;
     private final Set<TopicPartition> pendingPartitionsInTransaction;
     private final Set<TopicPartition> partitionsInTransaction;
+
+    /**
+     * During its normal course of operations, the transaction manager 
transitions through different internal
+     * states (i.e. by updating {@link #currentState}) to one of those defined 
in {@link State}. These state transitions
+     * result from actions on one of the following classes of threads:
+     *
+     * <ul>
+     *     <li><em>Application</em> threads that invokes {@link Producer} API 
calls</li>
+     *     <li><em>{@link Sender}</em> thread operations</li>
+     * </ul>
+     *
+     * When an invalid state transition is detected during execution on an 
<em>application</em> thread, the
+     * {@link #currentState} is <em>not updated</em> and an {@link 
IllegalStateException} is thrown. This gives the
+     * application the opportunity to fix the issue without permanently 
poisoning the state of the
+     * transaction manager. The {@link Producer} API calls that perform a 
state transition include:
+     *
+     * <ul>
+     *     <li>{@link Producer#initTransactions()} calls {@link 
#initializeTransactions()}</li>
+     *     <li>{@link Producer#beginTransaction()} calls {@link 
#beginTransaction()}</li>
+     *     <li>{@link Producer#commitTransaction()}} calls {@link 
#beginCommit()}</li>
+     *     <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
+     *     </li>
+     *     <li>{@link Producer#sendOffsetsToTransaction(Map, 
ConsumerGroupMetadata)} calls
+     *         {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
+     *     </li>
+     *     <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
+     *         {@link #maybeAddPartition(TopicPartition)} and
+     *         {@link #maybeTransitionToErrorState(RuntimeException)}
+     *     </li>
+     * </ul>
+     *
+     * <p/>
+     *
+     * The {@link Producer} is implemented such that much of its work 
delegated to and performed asynchronously on the
+     * <em>{@link Sender}</em> thread. This includes record batching, network 
I/O, broker response handlers, etc. If an
+     * invalid state transition is detected in the <em>{@link Sender}</em> 
thread, in addition to throwing an
+     * {@link IllegalStateException}, the transaction manager intentionally 
"poisons" itself by setting its
+     * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which 
it cannot recover.
+     *
+     * <p/>
+     *
+     * It's important to prevent possible corruption when the transaction 
manager has determined that it is in a
+     * fatal state. Subsequent transaction operations attempted via either the 
<em>application</em> or the
+     * <em>{@link Sender}</em> thread should fail. This is achieved when these 
operations invoke the
+     * {@link #maybeFailWithError()} method, as it causes a {@link 
KafkaException} to be thrown, ensuring the stated
+     * transactional guarantees are not violated.
+     *
+     * <p/>
+     *
+     * See KAFKA-14831 for more detail.
+     */
+    private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition;
     private PendingStateTransition pendingTransition;
 
     // This is used by the TxnRequestHandlers to control how long to back off 
before a given request is retried.
@@ -211,6 +265,7 @@ public class TransactionManager {
         this.newPartitionsInTransaction = new HashSet<>();
         this.pendingPartitionsInTransaction = new HashSet<>();
         this.partitionsInTransaction = new HashSet<>();
+        this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() 
-> false);
         this.pendingRequests = new PriorityQueue<>(10, 
Comparator.comparingInt(o -> o.priority().priority));
         this.pendingTxnOffsetCommits = new HashMap<>();
         this.partitionsWithUnresolvedSequences = new HashMap<>();
@@ -220,6 +275,10 @@ public class TransactionManager {
         this.apiVersions = apiVersions;
     }
 
+    void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
+        shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
+    }
+
     public synchronized TransactionalRequestResult initializeTransactions() {
         return initializeTransactions(ProducerIdAndEpoch.NONE);
     }
@@ -425,6 +484,11 @@ public class TransactionManager {
     }
 
     synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition 
topicPartition) {
+        if (hasFatalError()) {
+            log.debug("Ignoring producer ID and epoch update request since the 
producer is in fatal error state");
+            return;
+        }
+
         if (hasStaleProducerIdAndEpoch(topicPartition) && 
!hasInflightBatches(topicPartition)) {
             // If the batch was on a different ID and/or epoch (due to an 
epoch bump) and all its in-flight batches
             // have completed, reset the partition sequence so that the next 
batch (with the new epoch) starts from 0
@@ -984,11 +1048,17 @@ public class TransactionManager {
     private void transitionTo(State target, RuntimeException error) {
         if (!currentState.isTransitionValid(currentState, target)) {
             String idString = transactionalId == null ?  "" : "TransactionalId 
" + transactionalId + ": ";
-            throw new IllegalStateException(idString + "Invalid transition 
attempted from state "
-                    + currentState.name() + " to state " + target.name());
-        }
+            String message = idString + "Invalid transition attempted from 
state "
+                    + currentState.name() + " to state " + target.name();
 
-        if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
+            if (shouldPoisonStateOnInvalidTransition.get()) {
+                currentState = State.FATAL_ERROR;
+                lastError = new IllegalStateException(message);
+                throw lastError;
+            } else {
+                throw new IllegalStateException(message);
+            }
+        } else if (target == State.FATAL_ERROR || target == 
State.ABORTABLE_ERROR) {
             if (error == null)
                 throw new IllegalArgumentException("Cannot transition to " + 
target + " with a null exception");
             lastError = error;
@@ -1024,6 +1094,10 @@ public class TransactionManager {
             throw new InvalidProducerEpochException("Producer with 
transactionalId '" + transactionalId
                     + "' and " + producerIdAndEpoch + " attempted to produce 
with an old epoch");
         }
+        if (lastError instanceof IllegalStateException) {
+            throw new IllegalStateException("Producer with transactionalId '" 
+ transactionalId
+                    + "' and " + producerIdAndEpoch + " cannot execute 
transactional method because of previous invalid state transition attempt", 
lastError);
+        }
         throw new KafkaException("Cannot execute transactional method because 
we are in an error state", lastError);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8d26d3ae83d..1fbfcec0a3e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -3446,6 +3446,59 @@ public class TransactionManagerTest {
         assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
     }
 
+    @Test
+    public void testBackgroundInvalidStateTransitionIsFatal() {
+        doInitTransactions();
+        assertTrue(transactionManager.isTransactional());
+
+        transactionManager.setPoisonStateOnInvalidTransition(true);
+
+        // Intentionally perform an operation that will cause an invalid state 
transition. The detection of this
+        // will result in a poisoning of the transaction manager for all 
subsequent transactional operations since
+        // it was performed in the background.
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.handleFailedBatch(batchWithValue(tp0, "test"), new 
KafkaException(), false));
+        assertTrue(transactionManager.hasFatalError());
+
+        // Validate that all of these operations will fail after the invalid 
state transition attempt above.
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.beginTransaction());
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.beginAbort());
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.beginCommit());
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions());
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+    }
+
+    @Test
+    public void testForegroundInvalidStateTransitionIsRecoverable() {
+        // Intentionally perform an operation that will cause an invalid state 
transition. The detection of this
+        // will not poison the transaction manager since it was performed in 
the foreground.
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.beginAbort());
+        assertFalse(transactionManager.hasFatalError());
+
+        // Validate that the transactions can still run after the invalid 
state transition attempt above.
+        doInitTransactions();
+        assertTrue(transactionManager.isTransactional());
+
+        transactionManager.beginTransaction();
+        assertFalse(transactionManager.hasFatalError());
+
+        transactionManager.maybeAddPartition(tp1);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(tp1, Errors.NONE);
+        runUntil(() -> transactionManager.isPartitionAdded(tp1));
+
+        TransactionalRequestResult retryResult = 
transactionManager.beginCommit();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+        runUntil(() -> !transactionManager.hasOngoingTransaction());
+        runUntil(retryResult::isCompleted);
+        retryResult.await();
+        runUntil(retryResult::isAcked);
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
     private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws 
InterruptedException {
         final long nowMs = time.milliseconds();
         return accumulator.append(tp.topic(), tp.partition(), nowMs, 
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,

Reply via email to