This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 3bcfb64ea4e KAFKA-17076 retain last batch in each round of cleaning
(#20535)
3bcfb64ea4e is described below
commit 3bcfb64ea4eb2786ac8c027551b02195fc4c297b
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Nov 24 09:48:15 2025 -0800
KAFKA-17076 retain last batch in each round of cleaning (#20535)
In each round of cleaning, retain the last batch even if it's empty, so
that logEndOffset info will not get lost after compaction.
Reviewers: Jun Rao <[email protected]>, PoAn Yang <[email protected]>,
Chia-Ping Tsai <[email protected]>
Co-authored-by: Vincent Jiang
<[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 16 ++++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 67 ++++++++++++++++------
2 files changed, 62 insertions(+), 21 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index c39181338e5..c5f751f1343 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -624,7 +624,7 @@ private[log] class Cleaner(val id: Int,
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset),
log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
- cleanSegments(log, group, offsetMap, currentTime, stats,
transactionMetadata, legacyDeleteHorizonMs)
+ cleanSegments(log, group, offsetMap, currentTime, stats,
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset)
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
@@ -645,6 +645,7 @@ private[log] class Cleaner(val id: Int,
* @param transactionMetadata State of ongoing transactions which is carried
between the cleaning
* of the grouped segments
* @param legacyDeleteHorizonMs The delete horizon used for tombstones whose
version is less than 2
+ * @param upperBoundOffsetOfCleaningRound The upper bound offset of this
round of cleaning
*/
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
@@ -652,7 +653,8 @@ private[log] class Cleaner(val id: Int,
currentTime: Long,
stats: CleanerStats,
transactionMetadata:
CleanedTransactionMetadata,
- legacyDeleteHorizonMs: Long): Unit = {
+ legacyDeleteHorizonMs: Long,
+ upperBoundOffsetOfCleaningRound: Long): Unit
= {
// create a new segment with a suffix appended to the name of the log and
indexes
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config,
segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
@@ -682,7 +684,7 @@ private[log] class Cleaner(val id: Int,
try {
cleanInto(log.topicPartition, currentSegment.log, cleaned, map,
retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
- log.config.maxMessageSize, transactionMetadata,
lastOffsetOfActiveProducers, stats, currentTime = currentTime)
+ log.config.maxMessageSize, transactionMetadata,
lastOffsetOfActiveProducers, upperBoundOffsetOfCleaningRound, stats,
currentTime = currentTime)
} catch {
case e: LogSegmentOffsetOverflowException =>
// Split the current segment. It's also safest to abort the
current cleaning process, so that we retry from
@@ -728,6 +730,7 @@ private[log] class Cleaner(val id: Int,
* @param maxLogMessageSize The maximum message size of the corresponding
topic
* @param transactionMetadata The state of ongoing transactions which is
carried between the cleaning of the grouped segments
* @param lastRecordsOfActiveProducers The active producers and its last
data offset
+ * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in
the source segment
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*/
@@ -740,6 +743,7 @@ private[log] class Cleaner(val id: Int,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: mutable.Map[Long,
LastRecord],
+ upperBoundOffsetOfCleaningRound: Long,
stats: CleanerStats,
currentTime: Long): Unit = {
val logCleanerFilter: RecordFilter = new RecordFilter(currentTime,
deleteRetentionMs) {
@@ -774,7 +778,11 @@ private[log] class Cleaner(val id: Int,
val batchRetention: BatchRetention =
if (batch.hasProducerId && isBatchLastRecordOfProducer)
BatchRetention.RETAIN_EMPTY
- else if (discardBatchRecords)
+ else if (batch.nextOffset == upperBoundOffsetOfCleaningRound) {
+ // retain the last batch of the cleaning round, even if it's
empty, so that last offset information
+ // is not lost after cleaning.
+ BatchRetention.RETAIN_EMPTY
+ } else if (discardBatchRecords)
BatchRetention.DELETE
else
BatchRetention.DELETE_EMPTY
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 734845c440e..66341c72577 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -168,7 +168,7 @@ class LogCleanerTest extends Logging {
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
- cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
@@ -256,7 +256,7 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap,
stats)
- cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new
CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
// Validate based on the file name that log segment file is renamed
exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -423,7 +423,7 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats(time)
cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset,
offsetMap, stats)
- cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(),
stats, new CleanedTransactionMetadata, Long.MaxValue)
+ cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(),
stats, new CleanedTransactionMetadata, Long.MaxValue,
segments.last.readNextOffset)
dirtyOffset = offsetMap.latestOffset + 1
}
@@ -925,7 +925,7 @@ class LogCleanerTest extends Logging {
// clean the log
val stats = new CleanerStats()
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L,
stats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L,
stats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -938,7 +938,7 @@ class LogCleanerTest extends Logging {
val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -957,7 +957,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
)
}
@@ -974,7 +974,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
)
}
@@ -1356,12 +1356,42 @@ class LogCleanerTest extends Logging {
val keys = LogTestUtils.keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
+ val segments = log.logSegments.asScala.take(3).toSeq
assertThrows(classOf[LogCleaningAbortedException], () =>
- cleaner.cleanSegments(log, log.logSegments.asScala.take(3).toSeq, map,
0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, segments, map, 0L, new CleanerStats(),
+ new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
)
}
+ @Test
+ def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ // append messages to the log until we have four segments
+ while (log.numberOfSegments < 4)
+ log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), leaderEpoch = 0)
+ val keysFound = LogTestUtils.keysInLog(log)
+ assertEquals(0L until log.logEndOffset, keysFound)
+
+ // pretend all keys are deleted
+ val map = new FakeOffsetMap(Int.MaxValue)
+ keysFound.foreach(k => map.put(key(k), Long.MaxValue))
+
+ // clean the log
+ val segments = log.logSegments.asScala.take(3).toSeq
+ val stats = new CleanerStats()
+ cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+ assertEquals(2, log.logSegments.size)
+ assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size,
"one batch should be retained in the cleaned segment")
+ val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
+ assertEquals(log.logSegments.asScala.last.baseOffset - 1,
retainedBatch.lastOffset, "the retained batch should be the last batch")
+ assertFalse(retainedBatch.iterator.hasNext, "the retained batch should be
an empty batch")
+ }
+
/**
* Validate the logic for grouping log segments together for cleaning
*/
@@ -1618,16 +1648,17 @@ class LogCleanerTest extends Logging {
// Try to clean segment with offset overflow. This will trigger log split
and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new
CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
// Clean each segment now that split is complete.
+ val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
for (segmentToClean <- log.logSegments.asScala)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new
CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
log.close()
@@ -1666,9 +1697,11 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
+ val upperBoundOffset = log.activeSegment.baseOffset
+
// clean the log
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
var cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1684,7 +1717,7 @@ class LogCleanerTest extends Logging {
// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1701,7 +1734,7 @@ class LogCleanerTest extends Logging {
// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1723,7 +1756,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1741,7 +1774,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1759,7 +1792,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)