This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08c876c1fe5 Fix TransactionStateManager handling of empty batch when 
loading transaction metadata (#17688)
08c876c1fe5 is described below

commit 08c876c1fe5b32d8301b2cc4cabc53f2d3088312
Author: Vincent Jiang <[email protected]>
AuthorDate: Tue Nov 5 14:22:20 2024 -0800

    Fix TransactionStateManager handling of empty batch when loading 
transaction metadata (#17688)
    
    When loading transaction metadata from a transaction log partition, if the 
partition contains a segment ending with an empty batch, "currOffset" update 
logic at will be skipped for the last batch. Since "currOffset" is not advanced 
to next offset of last batch properly, 
TransactionStateManager.loadTransactionMetadata method will be stuck in the 
"while" loop.
    
    This change fixes the issue by updating "currOffset" after processing each 
batch, whether the batch is empty or not.
    
    Reviewers: Justine Olshan <[email protected]>, Jun Rao <[email protected]>
---
 .../transaction/TransactionStateManager.scala      |  2 +-
 .../transaction/TransactionStateManagerTest.scala  | 65 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 5abd9768767..747d3f63eb9 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -479,7 +479,6 @@ class TransactionStateManager(brokerId: Int,
                       case Some(txnMetadata) =>
                         loadedTransactions.put(transactionalId, txnMetadata)
                     }
-                    currOffset = batch.nextOffset
 
                   case unknownKey: UnknownKey =>
                     warn(s"Unknown message key with version 
${unknownKey.version}" +
@@ -487,6 +486,7 @@ class TransactionStateManager(brokerId: Int,
                       "It could be a left over from an aborted upgrade.")
                 }
               }
+              currOffset = batch.nextOffset
             }
           }
         } catch {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index a65a363f9f5..36dcaaa7e60 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -938,6 +938,71 @@ class TransactionStateManagerTest {
     assertEquals(0, transactionManager.loadingPartitions.size)
   }
 
+  private def createEmptyBatch(baseOffset: Long, lastOffset: Long): 
MemoryRecords = {
+    val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD)
+    DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, RecordBatch.NO_PRODUCER_ID,
+      RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, 
lastOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      TimestampType.CREATE_TIME, System.currentTimeMillis, false, false)
+    buffer.flip
+    MemoryRecords.readableRecords(buffer)
+  }
+
+  @Test
+  def testLoadTransactionMetadataContainingSegmentEndingWithEmptyBatch(): Unit 
= {
+    // Simulate a case where a log contains two segments and the first segment 
ending with an empty batch.
+    txnMetadata1.state = PrepareCommit
+    txnMetadata1.addPartitions(Set[TopicPartition](new 
TopicPartition("topic1", 0)))
+    txnMetadata2.state = Ongoing
+    txnMetadata2.addPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)))
+
+    // Create the first segment which contains two batches.
+    // The first batch has one transactional record
+    val txnRecords1 = new SimpleRecord(txnMessageKeyBytes1, 
TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), TV_2))
+    val records1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, 
Compression.NONE, TimestampType.CREATE_TIME, txnRecords1)
+    // The second batch is an empty batch.
+    val records2 = createEmptyBatch(1L, 1L)
+
+    val combinedBuffer = ByteBuffer.allocate(records1.buffer.limit + 
records2.buffer.limit)
+    combinedBuffer.put(records1.buffer)
+    combinedBuffer.put(records2.buffer)
+    combinedBuffer.flip
+    val firstSegmentRecords = MemoryRecords.readableRecords(combinedBuffer)
+
+    // Create the second segment which contains one batch
+    val txnRecords3 = new SimpleRecord(txnMessageKeyBytes2, 
TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), TV_2))
+    val secondSegmentRecords = 
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 2L, Compression.NONE, 
TimestampType.CREATE_TIME, txnRecords3)
+
+    // Prepare a txn log
+    reset(replicaManager)
+
+    val logMock = mock(classOf[UnifiedLog])
+    when(replicaManager.getLog(topicPartition)).thenReturn(Some(logMock))
+    when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(3L))
+
+    when(logMock.logStartOffset).thenReturn(0L)
+    when(logMock.read(ArgumentMatchers.eq(0L),
+      maxLength = anyInt(),
+      isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+      minOneMessage = ArgumentMatchers.eq(true)))
+      .thenReturn(new FetchDataInfo(new LogOffsetMetadata(0L), 
firstSegmentRecords))
+    when(logMock.read(ArgumentMatchers.eq(2L),
+      maxLength = anyInt(),
+      isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+      minOneMessage = ArgumentMatchers.eq(true)))
+      .thenReturn(new FetchDataInfo(new LogOffsetMetadata(2L), 
secondSegmentRecords))
+
+    // Load transactions should not stuck.
+    transactionManager.loadTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch = 1, (_, _, _, _) => ())
+    assertEquals(0, transactionManager.loadingPartitions.size)
+    assertEquals(1, transactionManager.transactionMetadataCache.size)
+    
assertTrue(transactionManager.transactionMetadataCache.contains(partitionId))
+    // all transactions should have been loaded
+    val txnMetadataPool = 
transactionManager.transactionMetadataCache(partitionId).metadataPerTransactionalId
+    assertEquals(2, txnMetadataPool.size)
+    assertTrue(txnMetadataPool.contains(transactionalId1))
+    assertTrue(txnMetadataPool.contains(transactionalId2))
+  }
+
   private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): 
Unit = {
     transactionManager.getTransactionState(transactionalId) match {
       case Left(_) => fail("shouldn't have been any errors")

Reply via email to