This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04d34f3676c KAFKA-17124: Fix flaky 
DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset (#16580)
04d34f3676c is described below

commit 04d34f3676c5e760fc1adc01bd988dbdd219edde
Author: Federico Valeri <[email protected]>
AuthorDate: Thu Jul 18 04:13:29 2024 +0200

    KAFKA-17124: Fix flaky 
DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset (#16580)
    
    This changes should fix the flakiness reported for 
DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset.
    I was not able to reproduce locally, but the issue was that the second 
segment was not created in time:
    
    Missing required argument "[files]"
    
    The fix consists of getting the log path directly from the rolled segment.
    
    We were also creating the log twice, and that was producing this warning:
    
    [2024-07-12 00:57:28,368] WARN [LocalLog partition=kafka-832386, 
dir=/tmp/kafka-2956913950351159820] Trying to roll a new log segment with start 
offset 0 =max(provided offset = Some(0), LEO = 0) while it already exists and 
is active with size 0. Size of time index: 873813, size of offset index: 
1310720. (kafka.log.LocalLog:70)
    
    This is also fixed.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 65 +++++++++++++---------
 1 file changed, 39 insertions(+), 26 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 5ddb0399029..977f9dcb3cb 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.protocol.{ByteBufferAccessor, 
ObjectSerializationCache}
-import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
+import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, 
SimpleRecord}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.{CoordinatorRecord, 
CoordinatorRecordSerde}
 import 
org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, 
ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, 
GroupMetadataValue}
@@ -52,7 +52,7 @@ import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.snapshot.RecordsSnapshotWriter
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, 
LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, Test}
 
 import java.nio.file.attribute.PosixFilePermissions
 import java.nio.file.{AccessDeniedException, Files, NoSuchFileException, Paths}
@@ -73,12 +73,14 @@ class DumpLogSegmentsTest {
   val indexFilePath = s"$logDir/$segmentName.index"
   val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
   val time = new MockTime(0, 0)
-
-  val batches = new ArrayBuffer[BatchInfo]
   var log: UnifiedLog = _
 
-  @BeforeEach
-  def setUp(): Unit = {
+  @AfterEach
+  def afterEach(): Unit = {
+    Option(log).foreach(log => Utils.closeQuietly(log, "UnifiedLog"))
+  }
+
+  private def createTestLog = {
     val props = new Properties
     props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
     log = UnifiedLog(
@@ -96,9 +98,10 @@ class DumpLogSegmentsTest {
       topicId = None,
       keepPartitionMetadataFile = true
     )
+    log
   }
 
-  def addSimpleRecords(): Unit = {
+  private def addSimpleRecords(log: UnifiedLog, batches: 
ArrayBuffer[BatchInfo]): Unit = {
     val now = System.currentTimeMillis()
     val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 
2, s"message key $i".getBytes, s"message value $i".getBytes)}
     batches += BatchInfo(firstBatchRecords, hasKeys = true, hasValues = true)
@@ -117,14 +120,10 @@ class DumpLogSegmentsTest {
     log.flush(false)
   }
 
-  @AfterEach
-  def tearDown(): Unit = {
-    Utils.closeQuietly(log, "UnifiedLog")
-    Utils.delete(tmpDir)
-  }
-
   @Test
   def testBatchAndRecordMetadataOutput(): Unit = {
+    log = createTestLog
+
     log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 0,
       new SimpleRecord("a".getBytes),
       new SimpleRecord("b".getBytes)
@@ -154,12 +153,15 @@ class DumpLogSegmentsTest {
       new EndTransactionMarker(ControlRecordType.COMMIT, 100)
     ), origin = AppendOrigin.COORDINATOR, leaderEpoch = 7)
 
-    assertDumpLogRecordMetadata()
+    assertDumpLogRecordMetadata(log)
   }
 
   @Test
   def testPrintDataLog(): Unit = {
-    addSimpleRecords()
+    log = createTestLog
+    val batches = new ArrayBuffer[BatchInfo]
+    addSimpleRecords(log, batches)
+    
     def verifyRecordsInOutput(checkKeysAndValues: Boolean, args: 
Array[String]): Unit = {
       def isBatch(index: Int): Boolean = {
         var i = 0
@@ -229,7 +231,10 @@ class DumpLogSegmentsTest {
 
   @Test
   def testDumpIndexMismatches(): Unit = {
-    addSimpleRecords()
+    log = createTestLog
+    val batches = new ArrayBuffer[BatchInfo]
+    addSimpleRecords(log, batches)
+    
     val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
     DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = 
false, verifyOnly = true, offsetMismatches,
       Int.MaxValue)
@@ -238,7 +243,10 @@ class DumpLogSegmentsTest {
 
   @Test
   def testDumpTimeIndexErrors(): Unit = {
-    addSimpleRecords()
+    log = createTestLog
+    val batches = new ArrayBuffer[BatchInfo]
+    addSimpleRecords(log, batches)
+    
     val errors = new TimeIndexDumpErrors
     DumpLogSegments.dumpTimeIndex(new File(timeIndexFilePath), indexSanityOnly 
= false, verifyOnly = true, errors)
     assertEquals(Map.empty, errors.misMatchesForTimeIndexFilesMap)
@@ -384,18 +392,18 @@ class DumpLogSegmentsTest {
       new SimpleRecord(null, new RemoteLogMetadataSerde().serialize(message))
     }).toArray
     
-    val memoryRecordsSizeInBytes = MemoryRecords.withRecords(Compression.NONE, 
metadataRecords:_*).sizeInBytes()
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
memoryRecordsSizeInBytes)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
     log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, 
time.scheduler, time)
     log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
metadataRecords:_*), leaderEpoch = 0)
-    log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
metadataRecords:_*), leaderEpoch = 0)
+    val secondSegment = log.roll();
+    secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L, 
MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
+    secondSegment.flush()
     log.flush(true)
-
-    val logPaths = 
logDir.listFiles.filter(_.getName.endsWith(".log")).map(_.getAbsolutePath)
+    
     val expectedDeletePayload = 
String.format("RemotePartitionDeleteMetadata{topicPartition=%s:%s-0, " +
       "state=DELETE_PARTITION_MARKED, eventTimestampMs=0, brokerId=0}", 
topicId, topicName)
 
-    val output = runDumpLogSegments(Array("--remote-log-metadata-decoder", 
"--files", logPaths(1)))
+    val output = runDumpLogSegments(Array("--remote-log-metadata-decoder", 
"--files", secondSegment.log().file().getAbsolutePath))
     assertTrue(batchCount(output) == 1)
     assertTrue(recordCount(output) == 1)
     assertTrue(output.contains("Log starting offset: 1"))
@@ -584,6 +592,8 @@ class DumpLogSegmentsTest {
 
   @Test
   def testDumpEmptyIndex(): Unit = {
+    log = createTestLog
+    
     val indexFile = new File(indexFilePath)
     new PrintWriter(indexFile).close()
     val expectOutput = s"$indexFile is empty.\n"
@@ -605,7 +615,10 @@ class DumpLogSegmentsTest {
 
   @Test
   def testPrintDataLogPartialBatches(): Unit = {
-    addSimpleRecords()
+    log = createTestLog
+    val batches = new ArrayBuffer[BatchInfo]
+    addSimpleRecords(log, batches)
+    
     val totalBatches = batches.size
     val partialBatches = totalBatches / 2
 
@@ -914,7 +927,7 @@ class DumpLogSegmentsTest {
     fields.toMap
   }
 
-  private def assertDumpLogRecordMetadata(): Unit = {
+  private def assertDumpLogRecordMetadata(log: UnifiedLog): Unit = {
     val logReadInfo = log.read(
       startOffset = 0,
       maxLength = Int.MaxValue,
@@ -977,4 +990,4 @@ object DumpLogSegmentsTest {
   class TestDecoderWithoutVerifiableProperties() extends 
kafka.serializer.Decoder[Array[Byte]] {
     override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
   }
-}
\ No newline at end of file
+}

Reply via email to