Xiaobing Fang created KAFKA-17436:
-------------------------------------

             Summary: Follower index dump mismatch
                 Key: KAFKA-17436
                 URL: https://issues.apache.org/jira/browse/KAFKA-17436
             Project: Kafka
          Issue Type: Bug
          Components: core
            Reporter: Xiaobing Fang
            Assignee: Xiaobing Fang
         Attachments: image-2024-08-28-14-42-44-152.png

When writing data, the starting position and the last offset of the current 
fetch data will be written to the index. However, when dumping the index, the 
starting position and the last offset of a batch are verified. Therefore, if 
the follower fetches multiple batches, it will cause problems with the index 
file.

 

Reproduction:

Unit test in DumpLogSegmentsTest

 
{code:java}
def addMultiRecordBatches(): Unit = {
  var recordsCount = 0L
  val batches = new ArrayBuffer[BatchInfo]
  val now = System.currentTimeMillis()
  val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, 
s"message key $i".getBytes, s"message value $i".getBytes)}
  batches += BatchInfo(firstBatchRecords, true, true)
  val secondBatchRecords = (10 until 30).map { i => new SimpleRecord(now + i * 
3, s"message key $i".getBytes, null)}
  batches += BatchInfo(secondBatchRecords, true, false)
  val thirdBatchRecords = (30 until 50).map { i => new SimpleRecord(now + i * 
5, null, s"message value $i".getBytes)}
  batches += BatchInfo(thirdBatchRecords, false, true)
  val fourthBatchRecords = (50 until 60).map { i => new SimpleRecord(now + i * 
7, null)}
  batches += BatchInfo(fourthBatchRecords, false, false)

  batches.foreach { batchInfo =>
    val buf = ByteBuffer.allocate(2048)
    batchInfo.records.grouped(5).foreach { records =>
      val builder = MemoryRecords.builder(buf, RecordBatch.MAGIC_VALUE_V1, 
Compression.NONE, TimestampType.CREATE_TIME, recordsCount)
      records.foreach(builder.append)
      builder.close()
      recordsCount += records.size.toLong
    }
    buf.flip()
    log.appendAsFollower(MemoryRecords.readableRecords(buf.slice()))
  }
  // Flush, but don't close so that the indexes are not trimmed and contain 
some zero entries
  log.flush(false)
}

@Test
def testDumpIndexMismatches2(): Unit = {
  log = createTestLog
  addMultiRecordBatches()
  val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
  DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = false, 
verifyOnly = true, offsetMismatches,
    Int.MaxValue)
  assertEquals(Map.empty, offsetMismatches)
} {code}
System test:
 # start server and produce records
 # stop follower and delete all data in follower
 # start follower for fetching data

result:
 * follower index file size is smaller than leader
 * dump follower index throw index mismatch errors

!image-2024-08-28-14-42-44-152.png!

 

Solution:
1: Repair class DumpLogSegments so that it will not report errors
2: After follower fetched multiple batches, write multiple indexes instead of 
one

 

What kind of solution is recommended, I can submit the repair code

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to