This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08c876c1fe5 Fix TransactionStateManager handling of empty batch when
loading transaction metadata (#17688)
08c876c1fe5 is described below
commit 08c876c1fe5b32d8301b2cc4cabc53f2d3088312
Author: Vincent Jiang <[email protected]>
AuthorDate: Tue Nov 5 14:22:20 2024 -0800
Fix TransactionStateManager handling of empty batch when loading
transaction metadata (#17688)
When loading transaction metadata from a transaction log partition, if the
partition contains a segment ending with an empty batch, "currOffset" update
logic at will be skipped for the last batch. Since "currOffset" is not advanced
to next offset of last batch properly,
TransactionStateManager.loadTransactionMetadata method will be stuck in the
"while" loop.
This change fixes the issue by updating "currOffset" after processing each
batch, whether the batch is empty or not.
Reviewers: Justine Olshan <[email protected]>, Jun Rao <[email protected]>
---
.../transaction/TransactionStateManager.scala | 2 +-
.../transaction/TransactionStateManagerTest.scala | 65 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 5abd9768767..747d3f63eb9 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -479,7 +479,6 @@ class TransactionStateManager(brokerId: Int,
case Some(txnMetadata) =>
loadedTransactions.put(transactionalId, txnMetadata)
}
- currOffset = batch.nextOffset
case unknownKey: UnknownKey =>
warn(s"Unknown message key with version
${unknownKey.version}" +
@@ -487,6 +486,7 @@ class TransactionStateManager(brokerId: Int,
"It could be a left over from an aborted upgrade.")
}
}
+ currOffset = batch.nextOffset
}
}
} catch {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index a65a363f9f5..36dcaaa7e60 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -938,6 +938,71 @@ class TransactionStateManagerTest {
assertEquals(0, transactionManager.loadingPartitions.size)
}
+ private def createEmptyBatch(baseOffset: Long, lastOffset: Long):
MemoryRecords = {
+ val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD)
+ DefaultRecordBatch.writeEmptyHeader(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset,
lastOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ TimestampType.CREATE_TIME, System.currentTimeMillis, false, false)
+ buffer.flip
+ MemoryRecords.readableRecords(buffer)
+ }
+
+ @Test
+ def testLoadTransactionMetadataContainingSegmentEndingWithEmptyBatch(): Unit
= {
+ // Simulate a case where a log contains two segments and the first segment
ending with an empty batch.
+ txnMetadata1.state = PrepareCommit
+ txnMetadata1.addPartitions(Set[TopicPartition](new
TopicPartition("topic1", 0)))
+ txnMetadata2.state = Ongoing
+ txnMetadata2.addPartitions(Set[TopicPartition](new
TopicPartition("topic2", 0)))
+
+ // Create the first segment which contains two batches.
+ // The first batch has one transactional record
+ val txnRecords1 = new SimpleRecord(txnMessageKeyBytes1,
TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), TV_2))
+ val records1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
Compression.NONE, TimestampType.CREATE_TIME, txnRecords1)
+ // The second batch is an empty batch.
+ val records2 = createEmptyBatch(1L, 1L)
+
+ val combinedBuffer = ByteBuffer.allocate(records1.buffer.limit +
records2.buffer.limit)
+ combinedBuffer.put(records1.buffer)
+ combinedBuffer.put(records2.buffer)
+ combinedBuffer.flip
+ val firstSegmentRecords = MemoryRecords.readableRecords(combinedBuffer)
+
+ // Create the second segment which contains one batch
+ val txnRecords3 = new SimpleRecord(txnMessageKeyBytes2,
TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), TV_2))
+ val secondSegmentRecords =
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 2L, Compression.NONE,
TimestampType.CREATE_TIME, txnRecords3)
+
+ // Prepare a txn log
+ reset(replicaManager)
+
+ val logMock = mock(classOf[UnifiedLog])
+ when(replicaManager.getLog(topicPartition)).thenReturn(Some(logMock))
+ when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(3L))
+
+ when(logMock.logStartOffset).thenReturn(0L)
+ when(logMock.read(ArgumentMatchers.eq(0L),
+ maxLength = anyInt(),
+ isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+ minOneMessage = ArgumentMatchers.eq(true)))
+ .thenReturn(new FetchDataInfo(new LogOffsetMetadata(0L),
firstSegmentRecords))
+ when(logMock.read(ArgumentMatchers.eq(2L),
+ maxLength = anyInt(),
+ isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+ minOneMessage = ArgumentMatchers.eq(true)))
+ .thenReturn(new FetchDataInfo(new LogOffsetMetadata(2L),
secondSegmentRecords))
+
+ // Load transactions should not stuck.
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch = 1, (_, _, _, _) => ())
+ assertEquals(0, transactionManager.loadingPartitions.size)
+ assertEquals(1, transactionManager.transactionMetadataCache.size)
+
assertTrue(transactionManager.transactionMetadataCache.contains(partitionId))
+ // all transactions should have been loaded
+ val txnMetadataPool =
transactionManager.transactionMetadataCache(partitionId).metadataPerTransactionalId
+ assertEquals(2, txnMetadataPool.size)
+ assertTrue(txnMetadataPool.contains(transactionalId1))
+ assertTrue(txnMetadataPool.contains(transactionalId2))
+ }
+
private def verifyMetadataDoesExistAndIsUsable(transactionalId: String):
Unit = {
transactionManager.getTransactionState(transactionalId) match {
case Left(_) => fail("shouldn't have been any errors")