This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a81f35c1c8f KAFKA-14831: Illegal state errors should be fatal in
transactional producer (#13591)
a81f35c1c8f is described below
commit a81f35c1c8f9dc594aa585618c36f92ade0f86e2
Author: Kirk True <[email protected]>
AuthorDate: Thu Jun 29 11:21:15 2023 -0700
KAFKA-14831: Illegal state errors should be fatal in transactional producer
(#13591)
Poison the transaction manager if we detect an illegal transition in the
Sender thread. A ThreadLocal in is stored in TransactionManager so that the
Sender can inform TransactionManager which thread it's using.
Reviewers: Daniel Urban <[email protected]>, Justine Olshan
<[email protected]>, Jason Gustafson <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/producer/internals/Sender.java | 20 +++++-
.../producer/internals/TransactionManager.java | 82 ++++++++++++++++++++--
.../producer/internals/TransactionManagerTest.java | 53 ++++++++++++++
4 files changed, 150 insertions(+), 7 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 185c8dd283d..d7e36890cdd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -86,7 +86,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
-
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcherUtils|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
+
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1cf141c06e2..15c4be8c7b9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -238,6 +238,9 @@ public class Sender implements Runnable {
public void run() {
log.debug("Starting Kafka producer I/O thread.");
+ if (transactionManager != null)
+ transactionManager.setPoisonStateOnInvalidTransition(true);
+
// main loop, runs until close is called
while (running) {
try {
@@ -264,7 +267,14 @@ public class Sender implements Runnable {
while (!forceClose && transactionManager != null &&
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
- transactionManager.beginAbort();
+
+ try {
+ // It is possible for the transaction manager to throw
errors when aborting. Catch these
+ // so as not to interfere with the rest of the shutdown
logic.
+ transactionManager.beginAbort();
+ } catch (Exception e) {
+ log.error("Error in kafka producer I/O thread while
aborting transaction: ", e);
+ }
}
try {
runOnce();
@@ -784,7 +794,13 @@ public class Sender implements Runnable {
boolean adjustSequenceNumbers
) {
if (transactionManager != null) {
- transactionManager.handleFailedBatch(batch, topLevelException,
adjustSequenceNumbers);
+ try {
+ // This call can throw an exception in the rare case that
there's an invalid state transition
+ // attempted. Catch these so as not to interfere with the rest
of the logic.
+ transactionManager.handleFailedBatch(batch, topLevelException,
adjustSequenceNumbers);
+ } catch (Exception e) {
+ log.debug("Encountered error when handling a failed batch", e);
+ }
}
this.sensors.recordErrors(batch.topicPartition.topic(),
batch.recordCount);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 32bb4779160..e8ffb97fa71 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -120,6 +122,58 @@ public class TransactionManager {
private final Set<TopicPartition> newPartitionsInTransaction;
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
+
+ /**
+ * During its normal course of operations, the transaction manager
transitions through different internal
+ * states (i.e. by updating {@link #currentState}) to one of those defined
in {@link State}. These state transitions
+ * result from actions on one of the following classes of threads:
+ *
+ * <ul>
+ * <li><em>Application</em> threads that invokes {@link Producer} API
calls</li>
+ * <li><em>{@link Sender}</em> thread operations</li>
+ * </ul>
+ *
+ * When an invalid state transition is detected during execution on an
<em>application</em> thread, the
+ * {@link #currentState} is <em>not updated</em> and an {@link
IllegalStateException} is thrown. This gives the
+ * application the opportunity to fix the issue without permanently
poisoning the state of the
+ * transaction manager. The {@link Producer} API calls that perform a
state transition include:
+ *
+ * <ul>
+ * <li>{@link Producer#initTransactions()} calls {@link
#initializeTransactions()}</li>
+ * <li>{@link Producer#beginTransaction()} calls {@link
#beginTransaction()}</li>
+ * <li>{@link Producer#commitTransaction()}} calls {@link
#beginCommit()}</li>
+ * <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
+ * </li>
+ * <li>{@link Producer#sendOffsetsToTransaction(Map,
ConsumerGroupMetadata)} calls
+ * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
+ * </li>
+ * <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
+ * {@link #maybeAddPartition(TopicPartition)} and
+ * {@link #maybeTransitionToErrorState(RuntimeException)}
+ * </li>
+ * </ul>
+ *
+ * <p/>
+ *
+ * The {@link Producer} is implemented such that much of its work
delegated to and performed asynchronously on the
+ * <em>{@link Sender}</em> thread. This includes record batching, network
I/O, broker response handlers, etc. If an
+ * invalid state transition is detected in the <em>{@link Sender}</em>
thread, in addition to throwing an
+ * {@link IllegalStateException}, the transaction manager intentionally
"poisons" itself by setting its
+ * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which
it cannot recover.
+ *
+ * <p/>
+ *
+ * It's important to prevent possible corruption when the transaction
manager has determined that it is in a
+ * fatal state. Subsequent transaction operations attempted via either the
<em>application</em> or the
+ * <em>{@link Sender}</em> thread should fail. This is achieved when these
operations invoke the
+ * {@link #maybeFailWithError()} method, as it causes a {@link
KafkaException} to be thrown, ensuring the stated
+ * transactional guarantees are not violated.
+ *
+ * <p/>
+ *
+ * See KAFKA-14831 for more detail.
+ */
+ private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition;
private PendingStateTransition pendingTransition;
// This is used by the TxnRequestHandlers to control how long to back off
before a given request is retried.
@@ -211,6 +265,7 @@ public class TransactionManager {
this.newPartitionsInTransaction = new HashSet<>();
this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
+ this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(()
-> false);
this.pendingRequests = new PriorityQueue<>(10,
Comparator.comparingInt(o -> o.priority().priority));
this.pendingTxnOffsetCommits = new HashMap<>();
this.partitionsWithUnresolvedSequences = new HashMap<>();
@@ -220,6 +275,10 @@ public class TransactionManager {
this.apiVersions = apiVersions;
}
+ void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
+ shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
+ }
+
public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
}
@@ -425,6 +484,11 @@ public class TransactionManager {
}
synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition
topicPartition) {
+ if (hasFatalError()) {
+ log.debug("Ignoring producer ID and epoch update request since the
producer is in fatal error state");
+ return;
+ }
+
if (hasStaleProducerIdAndEpoch(topicPartition) &&
!hasInflightBatches(topicPartition)) {
// If the batch was on a different ID and/or epoch (due to an
epoch bump) and all its in-flight batches
// have completed, reset the partition sequence so that the next
batch (with the new epoch) starts from 0
@@ -984,11 +1048,17 @@ public class TransactionManager {
private void transitionTo(State target, RuntimeException error) {
if (!currentState.isTransitionValid(currentState, target)) {
String idString = transactionalId == null ? "" : "TransactionalId
" + transactionalId + ": ";
- throw new IllegalStateException(idString + "Invalid transition
attempted from state "
- + currentState.name() + " to state " + target.name());
- }
+ String message = idString + "Invalid transition attempted from
state "
+ + currentState.name() + " to state " + target.name();
- if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
+ if (shouldPoisonStateOnInvalidTransition.get()) {
+ currentState = State.FATAL_ERROR;
+ lastError = new IllegalStateException(message);
+ throw lastError;
+ } else {
+ throw new IllegalStateException(message);
+ }
+ } else if (target == State.FATAL_ERROR || target ==
State.ABORTABLE_ERROR) {
if (error == null)
throw new IllegalArgumentException("Cannot transition to " +
target + " with a null exception");
lastError = error;
@@ -1024,6 +1094,10 @@ public class TransactionManager {
throw new InvalidProducerEpochException("Producer with
transactionalId '" + transactionalId
+ "' and " + producerIdAndEpoch + " attempted to produce
with an old epoch");
}
+ if (lastError instanceof IllegalStateException) {
+ throw new IllegalStateException("Producer with transactionalId '"
+ transactionalId
+ + "' and " + producerIdAndEpoch + " cannot execute
transactional method because of previous invalid state transition attempt",
lastError);
+ }
throw new KafkaException("Cannot execute transactional method because
we are in an error state", lastError);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8d26d3ae83d..1fbfcec0a3e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -3446,6 +3446,59 @@ public class TransactionManagerTest {
assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
}
+ @Test
+ public void testBackgroundInvalidStateTransitionIsFatal() {
+ doInitTransactions();
+ assertTrue(transactionManager.isTransactional());
+
+ transactionManager.setPoisonStateOnInvalidTransition(true);
+
+ // Intentionally perform an operation that will cause an invalid state
transition. The detection of this
+ // will result in a poisoning of the transaction manager for all
subsequent transactional operations since
+ // it was performed in the background.
+ assertThrows(IllegalStateException.class, () ->
transactionManager.handleFailedBatch(batchWithValue(tp0, "test"), new
KafkaException(), false));
+ assertTrue(transactionManager.hasFatalError());
+
+ // Validate that all of these operations will fail after the invalid
state transition attempt above.
+ assertThrows(IllegalStateException.class, () ->
transactionManager.beginTransaction());
+ assertThrows(IllegalStateException.class, () ->
transactionManager.beginAbort());
+ assertThrows(IllegalStateException.class, () ->
transactionManager.beginCommit());
+ assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
+ assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions());
+ assertThrows(IllegalStateException.class, () ->
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new
ConsumerGroupMetadata("fake-group-id")));
+ }
+
+ @Test
+ public void testForegroundInvalidStateTransitionIsRecoverable() {
+ // Intentionally perform an operation that will cause an invalid state
transition. The detection of this
+ // will not poison the transaction manager since it was performed in
the foreground.
+ assertThrows(IllegalStateException.class, () ->
transactionManager.beginAbort());
+ assertFalse(transactionManager.hasFatalError());
+
+ // Validate that the transactions can still run after the invalid
state transition attempt above.
+ doInitTransactions();
+ assertTrue(transactionManager.isTransactional());
+
+ transactionManager.beginTransaction();
+ assertFalse(transactionManager.hasFatalError());
+
+ transactionManager.maybeAddPartition(tp1);
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareAddPartitionsToTxn(tp1, Errors.NONE);
+ runUntil(() -> transactionManager.isPartitionAdded(tp1));
+
+ TransactionalRequestResult retryResult =
transactionManager.beginCommit();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT,
producerId, epoch);
+ runUntil(() -> !transactionManager.hasOngoingTransaction());
+ runUntil(retryResult::isCompleted);
+ retryResult.await();
+ runUntil(retryResult::isAcked);
+ assertFalse(transactionManager.hasOngoingTransaction());
+ }
+
private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws
InterruptedException {
final long nowMs = time.milliseconds();
return accumulator.append(tp.topic(), tp.partition(), nowMs,
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,