This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a577d30d382 KAFKA-17076: retain last batch in each round of cleaning
(#17193)
a577d30d382 is described below
commit a577d30d3828b360915f8982f7c85fc1c378c8c9
Author: Vincent Jiang <[email protected]>
AuthorDate: Tue Sep 24 21:20:19 2024 -0700
KAFKA-17076: retain last batch in each round of cleaning (#17193)
In each round of cleaning, retain the last batch even if it's empty, so
that logEndOffset info will not get lost after compaction.
Reviewers: Jun Rao <[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 16 ++++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 67 ++++++++++++++++------
2 files changed, 62 insertions(+), 21 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 1265e979373..4f8d545be60 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -624,7 +624,7 @@ private[log] class Cleaner(val id: Int,
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset),
log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
- cleanSegments(log, group, offsetMap, currentTime, stats,
transactionMetadata, legacyDeleteHorizonMs)
+ cleanSegments(log, group, offsetMap, currentTime, stats,
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset)
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
@@ -645,6 +645,7 @@ private[log] class Cleaner(val id: Int,
* @param transactionMetadata State of ongoing transactions which is carried
between the cleaning
* of the grouped segments
* @param legacyDeleteHorizonMs The delete horizon used for tombstones whose
version is less than 2
+ * @param upperBoundOffsetOfCleaningRound The upper bound offset of this
round of cleaning
*/
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
@@ -652,7 +653,8 @@ private[log] class Cleaner(val id: Int,
currentTime: Long,
stats: CleanerStats,
transactionMetadata:
CleanedTransactionMetadata,
- legacyDeleteHorizonMs: Long): Unit = {
+ legacyDeleteHorizonMs: Long,
+ upperBoundOffsetOfCleaningRound: Long): Unit
= {
// create a new segment with a suffix appended to the name of the log and
indexes
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config,
segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
@@ -682,7 +684,7 @@ private[log] class Cleaner(val id: Int,
try {
cleanInto(log.topicPartition, currentSegment.log, cleaned, map,
retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
- log.config.maxMessageSize, transactionMetadata,
lastOffsetOfActiveProducers, stats, currentTime = currentTime)
+ log.config.maxMessageSize, transactionMetadata,
lastOffsetOfActiveProducers, upperBoundOffsetOfCleaningRound, stats,
currentTime = currentTime)
} catch {
case e: LogSegmentOffsetOverflowException =>
// Split the current segment. It's also safest to abort the
current cleaning process, so that we retry from
@@ -728,6 +730,7 @@ private[log] class Cleaner(val id: Int,
* @param maxLogMessageSize The maximum message size of the corresponding
topic
* @param transactionMetadata The state of ongoing transactions which is
carried between the cleaning of the grouped segments
* @param lastRecordsOfActiveProducers The active producers and its last
data offset
+ * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in
the source segment
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*/
@@ -740,6 +743,7 @@ private[log] class Cleaner(val id: Int,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: mutable.Map[Long,
LastRecord],
+ upperBoundOffsetOfCleaningRound: Long,
stats: CleanerStats,
currentTime: Long): Unit = {
val logCleanerFilter: RecordFilter = new RecordFilter(currentTime,
deleteRetentionMs) {
@@ -774,7 +778,11 @@ private[log] class Cleaner(val id: Int,
val batchRetention: BatchRetention =
if (batch.hasProducerId && isBatchLastRecordOfProducer)
BatchRetention.RETAIN_EMPTY
- else if (discardBatchRecords)
+ else if (batch.nextOffset == upperBoundOffsetOfCleaningRound) {
+ // retain the last batch of the cleaning round, even if it's
empty, so that last offset information
+ // is not lost after cleaning.
+ BatchRetention.RETAIN_EMPTY
+ } else if (discardBatchRecords)
BatchRetention.DELETE
else
BatchRetention.DELETE_EMPTY
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index a4b001d0a6a..63c02224f18 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -168,7 +168,7 @@ class LogCleanerTest extends Logging {
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
- cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
@@ -258,7 +258,7 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap,
stats)
- cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new
CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
// Validate based on the file name that log segment file is renamed
exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -425,7 +425,7 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats(time)
cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset,
offsetMap, stats)
- cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(),
stats, new CleanedTransactionMetadata, Long.MaxValue)
+ cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(),
stats, new CleanedTransactionMetadata, Long.MaxValue,
segments.last.readNextOffset)
dirtyOffset = offsetMap.latestOffset + 1
}
@@ -927,7 +927,7 @@ class LogCleanerTest extends Logging {
// clean the log
val stats = new CleanerStats()
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L,
stats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L,
stats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -940,7 +940,7 @@ class LogCleanerTest extends Logging {
val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -959,7 +959,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
)
}
@@ -976,7 +976,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap,
0L, new CleanerStats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
)
}
@@ -1358,12 +1358,42 @@ class LogCleanerTest extends Logging {
val keys = LogTestUtils.keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
+ val segments = log.logSegments.asScala.take(3).toSeq
assertThrows(classOf[LogCleaningAbortedException], () =>
- cleaner.cleanSegments(log, log.logSegments.asScala.take(3).toSeq, map,
0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ cleaner.cleanSegments(log, segments, map, 0L, new CleanerStats(),
+ new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
)
}
+ @Test
+ def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ // append messages to the log until we have four segments
+ while (log.numberOfSegments < 4)
+ log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), leaderEpoch = 0)
+ val keysFound = LogTestUtils.keysInLog(log)
+ assertEquals(0L until log.logEndOffset, keysFound)
+
+ // pretend all keys are deleted
+ val map = new FakeOffsetMap(Int.MaxValue)
+ keysFound.foreach(k => map.put(key(k), Long.MaxValue))
+
+ // clean the log
+ val segments = log.logSegments.asScala.take(3).toSeq
+ val stats = new CleanerStats()
+ cleaner.cleanSegments(log, segments, map, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+ assertEquals(2, log.logSegments.size)
+ assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size,
"one batch should be retained in the cleaned segment")
+ val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
+ assertEquals(log.logSegments.asScala.last.baseOffset - 1,
retainedBatch.lastOffset, "the retained batch should be the last batch")
+ assertFalse(retainedBatch.iterator.hasNext, "the retained batch should be
an empty batch")
+ }
+
/**
* Validate the logic for grouping log segments together for cleaning
*/
@@ -1620,16 +1650,17 @@ class LogCleanerTest extends Logging {
// Try to clean segment with offset overflow. This will trigger log split
and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new
CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
// Clean each segment now that split is complete.
+ val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
for (segmentToClean <- log.logSegments.asScala)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new
CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
log.close()
@@ -1668,9 +1699,11 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
+ val upperBoundOffset = log.activeSegment.baseOffset
+
// clean the log
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
var cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1686,7 +1719,7 @@ class LogCleanerTest extends Logging {
// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1703,7 +1736,7 @@ class LogCleanerTest extends Logging {
// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1725,7 +1758,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1743,7 +1776,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1761,7 +1794,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq,
offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata, -1)
+ new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)