This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4fc9e442c30 KAFKA-17898: Refine Epoch Bumping Logic (#17849)
4fc9e442c30 is described below
commit 4fc9e442c308364acc43046bdaefd87571ab20a5
Author: Ritika Reddy <[email protected]>
AuthorDate: Mon Nov 25 14:29:15 2024 -0800
KAFKA-17898: Refine Epoch Bumping Logic (#17849)
With KAFKA-14562, we implemented epoch bump on both the client and the
server. Mentioned below are the different epoch bump scenarios we have on hand
after enabled tv2
Non-Transactional Producers
• Epoch bumping is always allowed.
• Different code paths are used to handle epoch bumping.
Transactional Producers
No Epoch Bump Allowed
• coordinatorSupportsBumpingEpoch = false when initPIDVersion < 3 or
initPIDVersion = null.
Client-Triggered Epoch Bump Allowed
• coordinatorSupportsBumpingEpoch = true when initPIDVersion >= 3.
• TransactionVersion2Enabled = false when endTxnVersion < 5.
Only Server-Triggered Epoch Bump Allowed
• TransactionVersion2Enabled = true and endTxnVersion >= 5.
We want to refine the code and make it more structured to correctly handle
epoch bumping in the above mentioned cases.
The changes made in this patch are:
Rename epochBumpRequired to epochBumpTriggerRequired to symbolize a manual
epoch bump request from the client
Modify canEpochBump method according to the above mentioned scenarios
Reviewers: Artem Livshits <[email protected]>, Calvin Liu
<[email protected]>, Justine Olshan <[email protected]>
---
.../producer/internals/TransactionManager.java | 133 ++++++++++++++++-----
.../producer/internals/TransactionManagerTest.java | 60 +++++++++-
2 files changed, 155 insertions(+), 38 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3c916de9c0f..38bdc86aedd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -191,7 +191,7 @@ public class TransactionManager {
private volatile RuntimeException lastError = null;
private volatile ProducerIdAndEpoch producerIdAndEpoch;
private volatile boolean transactionStarted = false;
- private volatile boolean epochBumpRequired = false;
+ private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
@@ -351,7 +351,7 @@ public class TransactionManager {
enqueueRequest(handler);
// If an epoch bump is required for recovery, initialize the
transaction after completing the EndTxn request.
- if (epochBumpRequired) {
+ if (clientSideEpochBumpRequired) {
return initializeTransactions(this.producerIdAndEpoch);
}
@@ -477,6 +477,26 @@ public class TransactionManager {
}
}
+ /**
+ * Transitions to an abortable error state if the coordinator can handle
an abortable error or
+ * to a fatal error if not.
+ *
+ * @param abortableException The exception in case of an abortable
error.
+ * @param fatalException The exception in case of a fatal error.
+ */
+ private void transitionToAbortableErrorOrFatalError(
+ RuntimeException abortableException,
+ RuntimeException fatalException
+ ) {
+ if (canHandleAbortableError()) {
+ if (needToTriggerEpochBumpFromClient())
+ clientSideEpochBumpRequired = true;
+ transitionToAbortableError(abortableException);
+ } else {
+ transitionToFatalError(fatalException);
+ }
+ }
+
// visible for testing
synchronized boolean isPartitionAdded(TopicPartition partition) {
return partitionsInTransaction.contains(partition);
@@ -544,8 +564,11 @@ public class TransactionManager {
this.partitionsWithUnresolvedSequences.clear();
}
- synchronized void requestEpochBumpForPartition(TopicPartition tp) {
- epochBumpRequired = true;
+ /**
+ * This method is used to trigger an epoch bump for non-transactional
idempotent producers.
+ */
+ synchronized void requestIdempotentEpochBumpForPartition(TopicPartition
tp) {
+ clientSideEpochBumpRequired = true;
this.partitionsToRewriteSequences.add(tp);
}
@@ -564,12 +587,12 @@ public class TransactionManager {
}
this.partitionsToRewriteSequences.clear();
- epochBumpRequired = false;
+ clientSideEpochBumpRequired = false;
}
synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
if (!isTransactional()) {
- if (epochBumpRequired) {
+ if (clientSideEpochBumpRequired) {
bumpIdempotentProducerEpoch();
}
if (currentState != State.INITIALIZING && !hasProducerId()) {
@@ -675,8 +698,8 @@ public class TransactionManager {
|| exception instanceof UnsupportedVersionException) {
transitionToFatalError(exception);
} else if (isTransactional()) {
- if (canBumpEpoch() && !isCompleting()) {
- epochBumpRequired = true;
+ if (needToTriggerEpochBumpFromClient() && !isCompleting()) {
+ clientSideEpochBumpRequired = true;
}
transitionToAbortableError(exception);
}
@@ -699,7 +722,7 @@ public class TransactionManager {
// If we fail with an OutOfOrderSequenceException, we have a gap
in the log. Bump the epoch for this
// partition, which will reset the sequence number to 0 and allow
us to continue
- requestEpochBumpForPartition(batch.topicPartition);
+ requestIdempotentEpochBumpForPartition(batch.topicPartition);
} else if (exception instanceof UnknownProducerIdException) {
// If we get an UnknownProducerId for a partition, then the broker
has no state for that producer. It will
// therefore accept a write with sequence number 0. We reset the
sequence number for the partition here so
@@ -710,7 +733,7 @@ public class TransactionManager {
} else {
if (adjustSequenceNumbers) {
if (!isTransactional()) {
- requestEpochBumpForPartition(batch.topicPartition);
+
requestIdempotentEpochBumpForPartition(batch.topicPartition);
} else {
txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
}
@@ -760,21 +783,17 @@ public class TransactionManager {
// For the transactional producer, we bump the epoch
if possible, otherwise we transition to a fatal error
String unackedMessagesErr = "The client hasn't
received acknowledgment for some previously " +
"sent messages and can no longer retry them. ";
- if (canBumpEpoch()) {
- epochBumpRequired = true;
- KafkaException exception = new
KafkaException(unackedMessagesErr + "It is safe to abort " +
- "the transaction and continue.");
- transitionToAbortableError(exception);
- } else {
- KafkaException exception = new
KafkaException(unackedMessagesErr + "It isn't safe to continue.");
- transitionToFatalError(exception);
- }
+ KafkaException abortableException = new
KafkaException(unackedMessagesErr + "It is safe to abort " +
+ "the transaction and continue.");
+ KafkaException fatalException = new
KafkaException(unackedMessagesErr + "It isn't safe to continue.");
+
+
transitionToAbortableErrorOrFatalError(abortableException, fatalException);
} else {
// For the idempotent producer, bump the epoch
log.info("No inflight batches remaining for {}, last
ack'd sequence for partition is {}, next sequence is {}. " +
"Going to bump epoch and reset
sequence numbers.", topicPartition,
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER),
sequenceNumber(topicPartition));
- requestEpochBumpForPartition(topicPartition);
+ requestIdempotentEpochBumpForPartition(topicPartition);
}
iter.remove();
@@ -943,7 +962,7 @@ public class TransactionManager {
if (isTransactional()) {
txnPartitionMap.startSequencesAtBeginning(batch.topicPartition,
this.producerIdAndEpoch);
} else {
- requestEpochBumpForPartition(batch.topicPartition);
+
requestIdempotentEpochBumpForPartition(batch.topicPartition);
}
return true;
}
@@ -951,7 +970,7 @@ public class TransactionManager {
if (!isTransactional()) {
// For the idempotent producer, always retry
UNKNOWN_PRODUCER_ID errors. If the batch has the current
// producer ID and epoch, request a bump of the epoch.
Otherwise just retry the produce.
- requestEpochBumpForPartition(batch.topicPartition);
+ requestIdempotentEpochBumpForPartition(batch.topicPartition);
return true;
}
} else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
@@ -967,7 +986,7 @@ public class TransactionManager {
// and wait to see if the sequence resolves
if (!hasUnresolvedSequence(batch.topicPartition) ||
isNextSequenceForUnresolvedPartition(batch.topicPartition,
batch.baseSequence())) {
- requestEpochBumpForPartition(batch.topicPartition);
+
requestIdempotentEpochBumpForPartition(batch.topicPartition);
}
return true;
}
@@ -1164,23 +1183,59 @@ public class TransactionManager {
return result;
}
+ /**
+ * Determines if an epoch bump can be triggered manually based on the api
versions.
+ *
+ * <b>NOTE:</b>
+ * This method should only be used for transactional producers.
+ * For non-transactional producers epoch bumping is always allowed.
+ *
+ * <ol>
+ * <li><b>Client-Triggered Epoch Bump</b>:
+ * If the coordinator supports epoch bumping
(initProducerIdVersion.maxVersion() >= 3),
+ * client-triggered epoch bumping is allowed, returns true.
+ * <code>clientSideEpochBumpTriggerRequired</code> must be set to
true in this case.</li>
+ *
+ * <li><b>No Epoch Bump Allowed</b>:
+ * If the coordinator does not support epoch bumping, returns
false.</li>
+ *
+ * <li><b>Server-Triggered Only</b>:
+ * When TransactionV2 is enabled, epoch bumping is handled
automatically
+ * by the server in EndTxn, so manual epoch bumping is not
required, returns false.</li>
+ * </ol>
+ *
+ * @return true if a client-triggered epoch bump is allowed, otherwise
false.
+ */
// package-private for testing
- boolean canBumpEpoch() {
- if (!isTransactional()) {
- return true;
- }
+ boolean needToTriggerEpochBumpFromClient() {
+ return coordinatorSupportsBumpingEpoch && !isTransactionV2Enabled;
+ }
- return coordinatorSupportsBumpingEpoch;
+ /**
+ * Determines if the coordinator can handle an abortable error.
+ * Recovering from an abortable error requires an epoch bump which can be
triggered by the client
+ * or automatically taken care of at the end of every transaction
(Transaction V2).
+ * Use <code>needToTriggerEpochBumpFromClient</code> to check whether the
epoch bump needs to be triggered
+ * manually.
+ *
+ * <b>NOTE:</b>
+ * This method should only be used for transactional producers.
+ * There is no concept of abortable errors for idempotent producers.
+ *
+ * @return true if an abortable error can be handled, otherwise false.
+ */
+ boolean canHandleAbortableError() {
+ return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
}
private void completeTransaction() {
- if (epochBumpRequired) {
+ if (clientSideEpochBumpRequired) {
transitionTo(State.INITIALIZING);
} else {
transitionTo(State.READY);
}
lastError = null;
- epochBumpRequired = false;
+ clientSideEpochBumpRequired = false;
transactionStarted = false;
newPartitionsInTransaction.clear();
pendingPartitionsInTransaction.clear();
@@ -1209,9 +1264,23 @@ public class TransactionManager {
transitionToAbortableError(e);
}
+ /**
+ * Determines if an error should be treated as abortable or fatal,
based on transaction state and configuration.
+ * <ol><l> NOTE: Only use this method for transactional producers
</l></ol>
+ *
+ * - <b>Abortable Error</b>:
+ * An abortable error can be handled effectively, if epoch bumping
is supported.
+ * 1) If transactionV2 is enabled, automatic epoch bumping happens
at the end of every transaction.
+ * 2) If the client can trigger an epoch bump, the abortable error
can be handled.
+ *
+ *- <b>Fatal Error</b>:
+ * If epoch bumping is not supported, the system cannot recover
and the error must be treated as fatal.
+ * @param e the error to determine as either abortable or fatal.
+ */
void abortableErrorIfPossible(RuntimeException e) {
- if (canBumpEpoch()) {
- epochBumpRequired = true;
+ if (canHandleAbortableError()) {
+ if (needToTriggerEpochBumpFromClient())
+ clientSideEpochBumpRequired = true;
abortableError(e);
} else {
fatalError(e);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 02570b083ec..9ae80dc19ba 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -678,7 +678,7 @@ public class TransactionManagerTest {
assertEquals(2, transactionManager.sequenceNumber(tp0));
// The producerId might be reset due to a failure on another partition
- transactionManager.requestEpochBumpForPartition(tp1);
+ transactionManager.requestIdempotentEpochBumpForPartition(tp1);
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
initializeIdempotentProducerId(producerId + 1, (short) 0);
@@ -780,6 +780,21 @@ public class TransactionManagerTest {
return batch;
}
+ private ProducerBatch writeTransactionalBatchWithValue(
+ TransactionManager manager,
+ TopicPartition tp,
+ String value
+ ) {
+ manager.maybeUpdateProducerIdAndEpoch(tp);
+ int seq = manager.sequenceNumber(tp);
+ manager.incrementSequenceNumber(tp, 1);
+ ProducerBatch batch = batchWithValue(tp, value);
+ batch.setProducerState(manager.producerIdAndEpoch(), seq, true);
+ manager.addInFlightBatch(batch);
+ batch.close();
+ return batch;
+ }
+
private ProducerBatch batchWithValue(TopicPartition tp, String value) {
MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(64),
Compression.NONE, TimestampType.CREATE_TIME, 0L);
@@ -814,7 +829,7 @@ public class TransactionManagerTest {
transactionManager.incrementSequenceNumber(tp1, 3);
assertEquals(transactionManager.sequenceNumber(tp1), 3);
- transactionManager.requestEpochBumpForPartition(tp0);
+ transactionManager.requestIdempotentEpochBumpForPartition(tp0);
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
assertEquals(transactionManager.sequenceNumber(tp0), 0);
assertEquals(transactionManager.sequenceNumber(tp1), 3);
@@ -2948,7 +2963,7 @@ public class TransactionManagerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testEpochBumpAfterLastInflightBatchFails(boolean
transactionV2Enabled) {
+ public void
testEpochBumpAfterLastInFlightBatchFailsIdempotentProducer(boolean
transactionV2Enabled) {
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(producerId, epoch);
initializeIdempotentProducerId(producerId, epoch);
@@ -2980,6 +2995,39 @@ public class TransactionManagerTest {
assertEquals(0, transactionManager.sequenceNumber(tp0));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMaybeResolveSequencesTransactionalProducer(boolean
transactionV2Enabled) throws Exception {
+ initializeTransactionManager(Optional.of(transactionalId),
transactionV2Enabled);
+
+ // Initialize transaction with initial producer ID and epoch.
+ doInitTransactions(producerId, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartition(tp0);
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+ runUntil(() -> transactionManager.isPartitionAdded(tp0));
+
+ ProducerBatch b1 =
writeTransactionalBatchWithValue(transactionManager, tp0, "1");
+ assertEquals(Integer.valueOf(1),
transactionManager.sequenceNumber(tp0));
+
+ transactionManager.markSequenceUnresolved(b1);
+ assertTrue(transactionManager.hasUnresolvedSequences());
+
+ transactionManager.handleFailedBatch(b1, new TimeoutException(),
false);
+ // Call maybeResolveSequences to trigger resolution logic
+ transactionManager.maybeResolveSequences();
+
+ // Verify the type of error state the transaction is in.
+ if (transactionManager.isTransactionV2Enabled() ||
transactionManager.needToTriggerEpochBumpFromClient()) {
+ // Expected to throw an abortable error when epoch bumping is
allowed
+ assertTrue(transactionManager.hasAbortableError());
+ } else {
+ // Expected to throw a fatal error when epoch bumping is not
allowed
+ assertTrue(transactionManager.hasFatalError());
+ }
+ }
+
@Test
public void testEpochUpdateAfterBumpFromEndTxnResponseInV2() throws
InterruptedException {
initializeTransactionManager(Optional.of(transactionalId), true);
@@ -3506,13 +3554,13 @@ public class TransactionManagerTest {
}
@Test
- public void testCanBumpEpochDuringCoordinatorDisconnect() {
+ public void
testNeedToTriggerEpochBumpFromClientDuringCoordinatorDisconnect() {
doInitTransactions(0, (short) 0);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
- assertTrue(transactionManager.canBumpEpoch());
+ assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
apiVersions.remove(transactionManager.coordinator(CoordinatorType.TRANSACTION).idString());
- assertTrue(transactionManager.canBumpEpoch());
+ assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
}
@ParameterizedTest