This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 74803452309 KAFKA-20452 Avoid creating unnecessary empty batches in
LogCleaner below the High Watermark (#22089)
74803452309 is described below
commit 748034523093f9076223a9ef7b8a966da071b28f
Author: Jiayao Sun <[email protected]>
AuthorDate: Fri May 8 14:16:21 2026 +1200
KAFKA-20452 Avoid creating unnecessary empty batches in LogCleaner below
the High Watermark (#22089)
This patch replaces `upperBoundOffset` with `highWatermark` in
`Cleaner.cleanSegments()` / `cleanInto()`, update related Javadoc and
comments, and adds a test to verify that empty batches are not retained
when the highWatermark is beyond the cleaned range.
Reviewers: Jun Rao <[email protected]>, PoAn Yang <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 90 ++++++++++++++++------
.../kafka/storage/internals/log/Cleaner.java | 21 +++--
2 files changed, 77 insertions(+), 34 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index e1aa985e01c..aeb5d984749 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -171,7 +171,8 @@ class LogCleanerTest extends Logging {
val stats = new CleanerStats(Time.SYSTEM)
val expectedBytesRead = segments.map(_.size).sum
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
- cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+ log.updateHighWatermark(segments.last.readNextOffset)
+ cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new
CleanedTransactionMetadata, -1)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
@@ -262,7 +263,8 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0,
log.activeSegment.baseOffset).asScala.toSeq
val stats = new CleanerStats(Time.SYSTEM)
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap,
stats)
- cleaner.cleanSegments(log, segments.asJava, offsetMap, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+ log.updateHighWatermark(segments.last.readNextOffset)
+ cleaner.cleanSegments(log, segments.asJava, offsetMap, 0L, stats, new
CleanedTransactionMetadata, -1)
// Validate based on the file name that log segment file is renamed
exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -431,7 +433,8 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0,
log.activeSegment.baseOffset).asScala.toSeq
val stats = new CleanerStats(time)
cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset,
offsetMap, stats)
- cleaner.cleanSegments(log, segments.asJava, offsetMap,
time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue,
segments.last.readNextOffset)
+ log.updateHighWatermark(segments.last.readNextOffset)
+ cleaner.cleanSegments(log, segments.asJava, offsetMap,
time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue)
dirtyOffset = offsetMap.latestOffset + 1
}
@@ -940,7 +943,8 @@ class LogCleanerTest extends Logging {
// clean the log
val stats = new CleanerStats(Time.SYSTEM)
- cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
map, 0L, stats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
+ log.updateHighWatermark(log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
map, 0L, stats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -953,7 +957,8 @@ class LogCleanerTest extends Logging {
val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
- cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
+ log.updateHighWatermark(log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -971,8 +976,9 @@ class LogCleanerTest extends Logging {
file.close()
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+ log.updateHighWatermark(log.logSegments.asScala.head.readNextOffset)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1)
)
}
@@ -988,9 +994,10 @@ class LogCleanerTest extends Logging {
file.close()
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
- assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
- )
+
+ assertThrows(classOf[CorruptRecordException], () => {
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1)
+ })
}
def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int):
(UnifiedLog, FakeOffsetMap) = {
@@ -1371,12 +1378,17 @@ class LogCleanerTest extends Logging {
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
val segments = log.logSegments.asScala.take(3).toSeq
+ log.updateHighWatermark(segments.last.readNextOffset)
assertThrows(classOf[LogCleaningAbortedException], () =>
- cleaner.cleanSegments(log, segments.asJava, map, 0L, new
CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+ cleaner.cleanSegments(log, segments.asJava, map, 0L, new
CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1)
)
}
+ /**
+ * Test that if a cleaned batch's next offset equals the high watermark, the
batch is retained
+ * even if it is empty (i.e. all its records have been cleaned away). This
ensures that the last
+ * offset information before the high watermark is not lost after log
cleaning.
+ */
@Test
def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
@@ -1398,7 +1410,8 @@ class LogCleanerTest extends Logging {
// clean the log
val segments = log.logSegments.asScala.take(3).toSeq
val stats = new CleanerStats(Time.SYSTEM)
- cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+ log.updateHighWatermark(segments.last.readNextOffset)
+ cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new
CleanedTransactionMetadata, -1)
assertEquals(2, log.logSegments.size)
assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size,
"one batch should be retained in the cleaned segment")
val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
@@ -1406,6 +1419,38 @@ class LogCleanerTest extends Logging {
assertFalse(retainedBatch.iterator.hasNext, "the retained batch should be
an empty batch")
}
+ /**
+ * Test that if the high watermark is beyond the cleaned segments (i.e. no
batch's next offset
+ * equals the high watermark), empty batches produced by cleaning are NOT
retained and can be
+ * safely deleted. This is the counterpart to
[[testCleanSegmentsRetainingLastEmptyBatch]].
+ */
+ @Test
+ def testCleanSegmentsNotRetainingLastEmptyBatch(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ // append messages to the log until we have four segments
+ while (log.numberOfSegments < 4)
+ log.appendAsLeader(record(log.logEndOffset.toInt,
log.logEndOffset.toInt), 0)
+ val keysFound = LogTestUtils.keysInLog(log)
+ assertEquals(0L until log.logEndOffset, keysFound)
+
+ // pretend all keys are deleted
+ val map = new FakeOffsetMap(Int.MaxValue)
+ keysFound.foreach(k => map.put(key(k), Long.MaxValue))
+
+ // clean the log
+ val segments = log.logSegments.asScala.take(3).toSeq
+ val stats = new CleanerStats(Time.SYSTEM)
+ log.updateHighWatermark(log.logSegments.asScala.last.readNextOffset)
+ cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new
CleanedTransactionMetadata, -1)
+ assertEquals(2, log.logSegments.size)
+ assertEquals(0, log.logSegments.asScala.head.log.batches.asScala.size, "no
batches should remain in the cleaned segment")
+ }
+
/**
* Validate the logic for grouping log segments together for cleaning
*/
@@ -1659,9 +1704,9 @@ class LogCleanerTest extends Logging {
// Before: sourceSegment0, sourceSegment1, activeSegment = 3 total
val segmentCountBefore = log.logSegments.size
-
+ log.updateHighWatermark(sourceSegments.last.readNextOffset)
cleaner.cleanSegments(log, sourceSegments.asJava, offsetMap, 0L,
- new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1,
sourceSegments.last.readNextOffset)
+ new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1)
// With overflow, 2 source segments → 2 cleaned segments; net segment
count unchanged.
// Without overflow, 2 source → 1 cleaned; net count would be
segmentCountBefore - 1.
@@ -1692,8 +1737,9 @@ class LogCleanerTest extends Logging {
val segmentCountBefore = log.logSegments.size
+ log.updateHighWatermark(sourceSegments.last.readNextOffset)
cleaner.cleanSegments(log, sourceSegments.asJava, offsetMap, 0L,
- new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1,
sourceSegments.last.readNextOffset)
+ new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1)
assertEquals(segmentCountBefore, log.logSegments.size,
"offset overflow should produce 2 cleaned segments, keeping total
segment count the same")
@@ -1734,11 +1780,11 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
- val upperBoundOffset = log.activeSegment.baseOffset
+ log.updateHighWatermark(log.activeSegment.baseOffset)
// clean the log
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, upperBoundOffset)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
var cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1754,7 +1800,7 @@ class LogCleanerTest extends Logging {
// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, upperBoundOffset)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1771,7 +1817,7 @@ class LogCleanerTest extends Logging {
// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, upperBoundOffset)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1793,7 +1839,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, upperBoundOffset)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1811,7 +1857,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, upperBoundOffset)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1829,7 +1875,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
- new CleanedTransactionMetadata, -1, upperBoundOffset)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
index 7dfb36ce915..8f14c6e7f33 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
@@ -196,9 +196,8 @@ public class Cleaner {
log.config().maxIndexSize,
cleanable.firstUncleanableOffset()
);
-
for (List<LogSegment> group : groupedSegments) {
- cleanSegments(log, group, offsetMap, currentTime, stats,
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
+ cleanSegments(log, group, offsetMap, currentTime, stats,
transactionMetadata, legacyDeleteHorizonMs);
}
// record buffer utilization
@@ -225,7 +224,6 @@ public class Cleaner {
* @param transactionMetadata State of ongoing transactions which is
carried between the cleaning
* of the grouped segments
* @param legacyDeleteHorizonMs The delete horizon used for tombstones
whose version is less than 2
- * @param upperBoundOffsetOfCleaningRound The upper bound offset of this
round of cleaning
*/
public void cleanSegments(UnifiedLog log,
List<LogSegment> segments,
@@ -233,10 +231,8 @@ public class Cleaner {
long currentTime,
CleanerStats stats,
CleanedTransactionMetadata transactionMetadata,
- long legacyDeleteHorizonMs,
- long upperBoundOffsetOfCleaningRound) throws
IOException {
+ long legacyDeleteHorizonMs) throws IOException {
List<LogSegment> cleanedSegments = new ArrayList<>();
-
// Create initial cleaned segment with the base offset of the first
source segment
LogSegment currentCleaned =
UnifiedLog.createNewCleanedSegment(log.dir(), log.config(),
segments.get(0).baseOffset());
transactionMetadata.setCleanedIndex(Optional.of(currentCleaned.txnIndex()));
@@ -284,7 +280,7 @@ public class Cleaner {
log.config().maxMessageSize(),
transactionMetadata,
lastOffsetOfActiveProducers,
- upperBoundOffsetOfCleaningRound,
+ log.highWatermark(),
stats,
currentTime
);
@@ -359,7 +355,8 @@ public class Cleaner {
* @param maxLogMessageSize The maximum message size of the corresponding
topic
* @param transactionMetadata The state of ongoing transactions which is
carried between the cleaning of the grouped segments
* @param lastRecordsOfActiveProducers The active producers and its last
data offset
- * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in
the source segment
+ * @param highWatermark The high watermark of the log, used to retain the
batch whose next offset equals
+ * the high watermark so that the last offset
information is not lost after cleaning
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*
@@ -376,7 +373,7 @@ public class Cleaner {
int maxLogMessageSize,
CleanedTransactionMetadata transactionMetadata,
Map<Long, LastRecord> lastRecordsOfActiveProducers,
- long upperBoundOffsetOfCleaningRound,
+ long highWatermark,
CleanerStats stats,
long currentTime) throws IOException {
MemoryRecords.RecordFilter logCleanerFilter = new
MemoryRecords.RecordFilter(currentTime, deleteRetentionMs) {
@@ -413,9 +410,9 @@ public class Cleaner {
BatchRetention batchRetention;
if (batch.hasProducerId() && isBatchLastRecordOfProducer)
batchRetention = BatchRetention.RETAIN_EMPTY;
- else if (batch.nextOffset() ==
upperBoundOffsetOfCleaningRound) {
- // retain the last batch of the cleaning round, even if
it's empty, so that last offset information
- // is not lost after cleaning.
+ else if (batch.nextOffset() == highWatermark) {
+ // This is the last batch before the high watermark.
Retain it even if empty, so that the last
+ // offset information is not lost after cleaning.
batchRetention = BatchRetention.RETAIN_EMPTY;
} else if (discardBatchRecords)
batchRetention = BatchRetention.DELETE;