This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 04d34f3676c KAFKA-17124: Fix flaky
DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset (#16580)
04d34f3676c is described below
commit 04d34f3676c5e760fc1adc01bd988dbdd219edde
Author: Federico Valeri <[email protected]>
AuthorDate: Thu Jul 18 04:13:29 2024 +0200
KAFKA-17124: Fix flaky
DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset (#16580)
This changes should fix the flakiness reported for
DumpLogSegmentsTest#testDumpRemoteLogMetadataNonZeroStartingOffset.
I was not able to reproduce locally, but the issue was that the second
segment was not created in time:
Missing required argument "[files]"
The fix consists of getting the log path directly from the rolled segment.
We were also creating the log twice, and that was producing this warning:
[2024-07-12 00:57:28,368] WARN [LocalLog partition=kafka-832386,
dir=/tmp/kafka-2956913950351159820] Trying to roll a new log segment with start
offset 0 =max(provided offset = Some(0), LEO = 0) while it already exists and
is active with size 0. Size of time index: 873813, size of offset index:
1310720. (kafka.log.LocalLog:70)
This is also fixed.
Reviewers: Luke Chen <[email protected]>
---
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 65 +++++++++++++---------
1 file changed, 39 insertions(+), 26 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 5ddb0399029..977f9dcb3cb 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metadata.{PartitionChangeRecord,
RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor,
ObjectSerializationCache}
-import org.apache.kafka.common.record.{ControlRecordType,
EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
+import org.apache.kafka.common.record.{ControlRecordType,
EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion,
SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.{CoordinatorRecord,
CoordinatorRecordSerde}
import
org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue,
ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey,
GroupMetadataValue}
@@ -52,7 +52,7 @@ import org.apache.kafka.server.util.MockTime
import org.apache.kafka.snapshot.RecordsSnapshotWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation,
LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, Test}
import java.nio.file.attribute.PosixFilePermissions
import java.nio.file.{AccessDeniedException, Files, NoSuchFileException, Paths}
@@ -73,12 +73,14 @@ class DumpLogSegmentsTest {
val indexFilePath = s"$logDir/$segmentName.index"
val timeIndexFilePath = s"$logDir/$segmentName.timeindex"
val time = new MockTime(0, 0)
-
- val batches = new ArrayBuffer[BatchInfo]
var log: UnifiedLog = _
- @BeforeEach
- def setUp(): Unit = {
+ @AfterEach
+ def afterEach(): Unit = {
+ Option(log).foreach(log => Utils.closeQuietly(log, "UnifiedLog"))
+ }
+
+ private def createTestLog = {
val props = new Properties
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
log = UnifiedLog(
@@ -96,9 +98,10 @@ class DumpLogSegmentsTest {
topicId = None,
keepPartitionMetadataFile = true
)
+ log
}
- def addSimpleRecords(): Unit = {
+ private def addSimpleRecords(log: UnifiedLog, batches:
ArrayBuffer[BatchInfo]): Unit = {
val now = System.currentTimeMillis()
val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i *
2, s"message key $i".getBytes, s"message value $i".getBytes)}
batches += BatchInfo(firstBatchRecords, hasKeys = true, hasValues = true)
@@ -117,14 +120,10 @@ class DumpLogSegmentsTest {
log.flush(false)
}
- @AfterEach
- def tearDown(): Unit = {
- Utils.closeQuietly(log, "UnifiedLog")
- Utils.delete(tmpDir)
- }
-
@Test
def testBatchAndRecordMetadataOutput(): Unit = {
+ log = createTestLog
+
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 0,
new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes)
@@ -154,12 +153,15 @@ class DumpLogSegmentsTest {
new EndTransactionMarker(ControlRecordType.COMMIT, 100)
), origin = AppendOrigin.COORDINATOR, leaderEpoch = 7)
- assertDumpLogRecordMetadata()
+ assertDumpLogRecordMetadata(log)
}
@Test
def testPrintDataLog(): Unit = {
- addSimpleRecords()
+ log = createTestLog
+ val batches = new ArrayBuffer[BatchInfo]
+ addSimpleRecords(log, batches)
+
def verifyRecordsInOutput(checkKeysAndValues: Boolean, args:
Array[String]): Unit = {
def isBatch(index: Int): Boolean = {
var i = 0
@@ -229,7 +231,10 @@ class DumpLogSegmentsTest {
@Test
def testDumpIndexMismatches(): Unit = {
- addSimpleRecords()
+ log = createTestLog
+ val batches = new ArrayBuffer[BatchInfo]
+ addSimpleRecords(log, batches)
+
val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly =
false, verifyOnly = true, offsetMismatches,
Int.MaxValue)
@@ -238,7 +243,10 @@ class DumpLogSegmentsTest {
@Test
def testDumpTimeIndexErrors(): Unit = {
- addSimpleRecords()
+ log = createTestLog
+ val batches = new ArrayBuffer[BatchInfo]
+ addSimpleRecords(log, batches)
+
val errors = new TimeIndexDumpErrors
DumpLogSegments.dumpTimeIndex(new File(timeIndexFilePath), indexSanityOnly
= false, verifyOnly = true, errors)
assertEquals(Map.empty, errors.misMatchesForTimeIndexFilesMap)
@@ -384,18 +392,18 @@ class DumpLogSegmentsTest {
new SimpleRecord(null, new RemoteLogMetadataSerde().serialize(message))
}).toArray
- val memoryRecordsSizeInBytes = MemoryRecords.withRecords(Compression.NONE,
metadataRecords:_*).sizeInBytes()
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
memoryRecordsSizeInBytes)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats,
time.scheduler, time)
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
metadataRecords:_*), leaderEpoch = 0)
- log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
metadataRecords:_*), leaderEpoch = 0)
+ val secondSegment = log.roll();
+ secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L,
MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
+ secondSegment.flush()
log.flush(true)
-
- val logPaths =
logDir.listFiles.filter(_.getName.endsWith(".log")).map(_.getAbsolutePath)
+
val expectedDeletePayload =
String.format("RemotePartitionDeleteMetadata{topicPartition=%s:%s-0, " +
"state=DELETE_PARTITION_MARKED, eventTimestampMs=0, brokerId=0}",
topicId, topicName)
- val output = runDumpLogSegments(Array("--remote-log-metadata-decoder",
"--files", logPaths(1)))
+ val output = runDumpLogSegments(Array("--remote-log-metadata-decoder",
"--files", secondSegment.log().file().getAbsolutePath))
assertTrue(batchCount(output) == 1)
assertTrue(recordCount(output) == 1)
assertTrue(output.contains("Log starting offset: 1"))
@@ -584,6 +592,8 @@ class DumpLogSegmentsTest {
@Test
def testDumpEmptyIndex(): Unit = {
+ log = createTestLog
+
val indexFile = new File(indexFilePath)
new PrintWriter(indexFile).close()
val expectOutput = s"$indexFile is empty.\n"
@@ -605,7 +615,10 @@ class DumpLogSegmentsTest {
@Test
def testPrintDataLogPartialBatches(): Unit = {
- addSimpleRecords()
+ log = createTestLog
+ val batches = new ArrayBuffer[BatchInfo]
+ addSimpleRecords(log, batches)
+
val totalBatches = batches.size
val partialBatches = totalBatches / 2
@@ -914,7 +927,7 @@ class DumpLogSegmentsTest {
fields.toMap
}
- private def assertDumpLogRecordMetadata(): Unit = {
+ private def assertDumpLogRecordMetadata(log: UnifiedLog): Unit = {
val logReadInfo = log.read(
startOffset = 0,
maxLength = Int.MaxValue,
@@ -977,4 +990,4 @@ object DumpLogSegmentsTest {
class TestDecoderWithoutVerifiableProperties() extends
kafka.serializer.Decoder[Array[Byte]] {
override def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
-}
\ No newline at end of file
+}