This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74803452309 KAFKA-20452 Avoid creating unnecessary empty batches in 
LogCleaner below the High Watermark (#22089)
74803452309 is described below

commit 748034523093f9076223a9ef7b8a966da071b28f
Author: Jiayao Sun <[email protected]>
AuthorDate: Fri May 8 14:16:21 2026 +1200

    KAFKA-20452 Avoid creating unnecessary empty batches in LogCleaner below 
the High Watermark (#22089)
    
    This patch replaces `upperBoundOffset` with `highWatermark` in
    `Cleaner.cleanSegments()` / `cleanInto()`, update related Javadoc and
    comments, and adds a test to verify that empty batches are not retained
    when the highWatermark is beyond the cleaned range.
    
    Reviewers: Jun Rao <[email protected]>, PoAn Yang <[email protected]>,
    Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 90 ++++++++++++++++------
 .../kafka/storage/internals/log/Cleaner.java       | 21 +++--
 2 files changed, 77 insertions(+), 34 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index e1aa985e01c..aeb5d984749 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -171,7 +171,8 @@ class LogCleanerTest extends Logging {
     val stats = new CleanerStats(Time.SYSTEM)
     val expectedBytesRead = segments.map(_.size).sum
     val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
-    cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new 
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+    log.updateHighWatermark(segments.last.readNextOffset)
+    cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new 
CleanedTransactionMetadata, -1)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
     assertEquals(expectedBytesRead, stats.bytesRead)
   }
@@ -262,7 +263,8 @@ class LogCleanerTest extends Logging {
     val segments = log.logSegments(0, 
log.activeSegment.baseOffset).asScala.toSeq
     val stats = new CleanerStats(Time.SYSTEM)
     cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, 
stats)
-    cleaner.cleanSegments(log, segments.asJava, offsetMap, 0L, stats, new 
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+    log.updateHighWatermark(segments.last.readNextOffset)
+    cleaner.cleanSegments(log, segments.asJava, offsetMap, 0L, stats, new 
CleanedTransactionMetadata, -1)
 
     // Validate based on the file name that log segment file is renamed 
exactly once for async deletion
     assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -431,7 +433,8 @@ class LogCleanerTest extends Logging {
       val segments = log.logSegments(0, 
log.activeSegment.baseOffset).asScala.toSeq
       val stats = new CleanerStats(time)
       cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset, 
offsetMap, stats)
-      cleaner.cleanSegments(log, segments.asJava, offsetMap, 
time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue, 
segments.last.readNextOffset)
+      log.updateHighWatermark(segments.last.readNextOffset)
+      cleaner.cleanSegments(log, segments.asJava, offsetMap, 
time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue)
       dirtyOffset = offsetMap.latestOffset + 1
     }
 
@@ -940,7 +943,8 @@ class LogCleanerTest extends Logging {
 
     // clean the log
     val stats = new CleanerStats(Time.SYSTEM)
-    cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
map, 0L, stats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
+    log.updateHighWatermark(log.logSegments.asScala.head.readNextOffset)
+    cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
map, 0L, stats, new CleanedTransactionMetadata, -1)
     val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
@@ -953,7 +957,8 @@ class LogCleanerTest extends Logging {
     val (log, offsetMap) = 
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
+    log.updateHighWatermark(log.logSegments.asScala.head.readNextOffset)
+    cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1)
     val shouldRemain = LogTestUtils.keysInLog(log).filter(k => 
!offsetMap.map.containsKey(k.toString))
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
@@ -971,8 +976,9 @@ class LogCleanerTest extends Logging {
     file.close()
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    log.updateHighWatermark(log.logSegments.asScala.head.readNextOffset)
     assertThrows(classOf[CorruptRecordException], () =>
-      cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
+      cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1)
     )
   }
 
@@ -988,9 +994,10 @@ class LogCleanerTest extends Logging {
     file.close()
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    assertThrows(classOf[CorruptRecordException], () =>
-      cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
-    )
+
+    assertThrows(classOf[CorruptRecordException], () => {
+      cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1)
+    })
   }
 
   def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): 
(UnifiedLog, FakeOffsetMap) = {
@@ -1371,12 +1378,17 @@ class LogCleanerTest extends Logging {
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     val segments = log.logSegments.asScala.take(3).toSeq
+    log.updateHighWatermark(segments.last.readNextOffset)
     assertThrows(classOf[LogCleaningAbortedException], () =>
-      cleaner.cleanSegments(log, segments.asJava, map, 0L, new 
CleanerStats(Time.SYSTEM),
-        new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+      cleaner.cleanSegments(log, segments.asJava, map, 0L, new 
CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1)
     )
   }
 
+  /**
+   * Test that if a cleaned batch's next offset equals the high watermark, the 
batch is retained
+   * even if it is empty (i.e. all its records have been cleaned away). This 
ensures that the last
+   * offset information before the high watermark is not lost after log 
cleaning.
+   */
   @Test
   def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
@@ -1398,7 +1410,8 @@ class LogCleanerTest extends Logging {
     // clean the log
     val segments = log.logSegments.asScala.take(3).toSeq
     val stats = new CleanerStats(Time.SYSTEM)
-    cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new 
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+    log.updateHighWatermark(segments.last.readNextOffset)
+    cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new 
CleanedTransactionMetadata, -1)
     assertEquals(2, log.logSegments.size)
     assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size, 
"one batch should be retained in the cleaned segment")
     val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
@@ -1406,6 +1419,38 @@ class LogCleanerTest extends Logging {
     assertFalse(retainedBatch.iterator.hasNext,  "the retained batch should be 
an empty batch")
   }
 
+  /**
+   * Test that if the high watermark is beyond the cleaned segments (i.e. no 
batch's next offset
+   * equals the high watermark), empty batches produced by cleaning are NOT 
retained and can be
+   * safely deleted. This is the counterpart to 
[[testCleanSegmentsRetainingLastEmptyBatch]].
+   */
+  @Test
+  def testCleanSegmentsNotRetainingLastEmptyBatch(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: 
java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // append messages to the log until we have four segments
+    while (log.numberOfSegments < 4)
+      log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), 0)
+    val keysFound = LogTestUtils.keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // pretend all keys are deleted
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keysFound.foreach(k => map.put(key(k), Long.MaxValue))
+
+    // clean the log
+    val segments = log.logSegments.asScala.take(3).toSeq
+    val stats = new CleanerStats(Time.SYSTEM)
+    log.updateHighWatermark(log.logSegments.asScala.last.readNextOffset)
+    cleaner.cleanSegments(log, segments.asJava, map, 0L, stats, new 
CleanedTransactionMetadata, -1)
+    assertEquals(2, log.logSegments.size)
+    assertEquals(0, log.logSegments.asScala.head.log.batches.asScala.size, "no 
batches should remain in the cleaned segment")
+  }
+
   /**
    * Validate the logic for grouping log segments together for cleaning
    */
@@ -1659,9 +1704,9 @@ class LogCleanerTest extends Logging {
 
     // Before: sourceSegment0, sourceSegment1, activeSegment = 3 total
     val segmentCountBefore = log.logSegments.size
-
+    log.updateHighWatermark(sourceSegments.last.readNextOffset)
     cleaner.cleanSegments(log, sourceSegments.asJava, offsetMap, 0L,
-      new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, 
sourceSegments.last.readNextOffset)
+      new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1)
 
     // With overflow, 2 source segments → 2 cleaned segments; net segment 
count unchanged.
     // Without overflow, 2 source → 1 cleaned; net count would be 
segmentCountBefore - 1.
@@ -1692,8 +1737,9 @@ class LogCleanerTest extends Logging {
 
     val segmentCountBefore = log.logSegments.size
 
+    log.updateHighWatermark(sourceSegments.last.readNextOffset)
     cleaner.cleanSegments(log, sourceSegments.asJava, offsetMap, 0L,
-      new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, 
sourceSegments.last.readNextOffset)
+      new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1)
 
     assertEquals(segmentCountBefore, log.logSegments.size,
       "offset overflow should produce 2 cleaned segments, keeping total 
segment count the same")
@@ -1734,11 +1780,11 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
 
-    val upperBoundOffset = log.activeSegment.baseOffset
 
+    log.updateHighWatermark(log.activeSegment.baseOffset)
     // clean the log
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
-      new CleanedTransactionMetadata, -1, upperBoundOffset)
+      new CleanedTransactionMetadata, -1)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     var cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1754,7 +1800,7 @@ class LogCleanerTest extends Logging {
 
     // clean again
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
-      new CleanedTransactionMetadata, -1, upperBoundOffset)
+      new CleanedTransactionMetadata, -1)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1771,7 +1817,7 @@ class LogCleanerTest extends Logging {
 
     // clean again
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
-      new CleanedTransactionMetadata, -1, upperBoundOffset)
+      new CleanedTransactionMetadata, -1)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1793,7 +1839,7 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
-      new CleanedTransactionMetadata, -1, upperBoundOffset)
+      new CleanedTransactionMetadata, -1)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1811,7 +1857,7 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
-      new CleanedTransactionMetadata, -1, upperBoundOffset)
+      new CleanedTransactionMetadata, -1)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1829,7 +1875,7 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq.asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM),
-      new CleanedTransactionMetadata, -1, upperBoundOffset)
+      new CleanedTransactionMetadata, -1)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
index 7dfb36ce915..8f14c6e7f33 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
@@ -196,9 +196,8 @@ public class Cleaner {
                 log.config().maxIndexSize,
                 cleanable.firstUncleanableOffset()
         );
-
         for (List<LogSegment> group : groupedSegments) {
-            cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
+            cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs);
         }
 
         // record buffer utilization
@@ -225,7 +224,6 @@ public class Cleaner {
      * @param transactionMetadata State of ongoing transactions which is 
carried between the cleaning
      *                            of the grouped segments
      * @param legacyDeleteHorizonMs The delete horizon used for tombstones 
whose version is less than 2
-     * @param upperBoundOffsetOfCleaningRound The upper bound offset of this 
round of cleaning
      */
     public void cleanSegments(UnifiedLog log,
                               List<LogSegment> segments,
@@ -233,10 +231,8 @@ public class Cleaner {
                               long currentTime,
                               CleanerStats stats,
                               CleanedTransactionMetadata transactionMetadata,
-                              long legacyDeleteHorizonMs,
-                              long upperBoundOffsetOfCleaningRound) throws 
IOException {
+                              long legacyDeleteHorizonMs) throws IOException {
         List<LogSegment> cleanedSegments = new ArrayList<>();
-
         // Create initial cleaned segment with the base offset of the first 
source segment
         LogSegment currentCleaned = 
UnifiedLog.createNewCleanedSegment(log.dir(), log.config(), 
segments.get(0).baseOffset());
         
transactionMetadata.setCleanedIndex(Optional.of(currentCleaned.txnIndex()));
@@ -284,7 +280,7 @@ public class Cleaner {
                             log.config().maxMessageSize(),
                             transactionMetadata,
                             lastOffsetOfActiveProducers,
-                            upperBoundOffsetOfCleaningRound,
+                            log.highWatermark(),
                             stats,
                             currentTime
                     );
@@ -359,7 +355,8 @@ public class Cleaner {
      * @param maxLogMessageSize The maximum message size of the corresponding 
topic
      * @param transactionMetadata The state of ongoing transactions which is 
carried between the cleaning of the grouped segments
      * @param lastRecordsOfActiveProducers The active producers and its last 
data offset
-     * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in 
the source segment
+     * @param highWatermark The high watermark of the log, used to retain the 
batch whose next offset equals
+     *                      the high watermark so that the last offset 
information is not lost after cleaning
      * @param stats Collector for cleaning statistics
      * @param currentTime The time at which the clean was initiated
      *
@@ -376,7 +373,7 @@ public class Cleaner {
                            int maxLogMessageSize,
                            CleanedTransactionMetadata transactionMetadata,
                            Map<Long, LastRecord> lastRecordsOfActiveProducers,
-                           long upperBoundOffsetOfCleaningRound,
+                           long highWatermark,
                            CleanerStats stats,
                            long currentTime) throws IOException {
         MemoryRecords.RecordFilter logCleanerFilter = new 
MemoryRecords.RecordFilter(currentTime, deleteRetentionMs) {
@@ -413,9 +410,9 @@ public class Cleaner {
                 BatchRetention batchRetention;
                 if (batch.hasProducerId() && isBatchLastRecordOfProducer)
                     batchRetention = BatchRetention.RETAIN_EMPTY;
-                else if (batch.nextOffset() == 
upperBoundOffsetOfCleaningRound) {
-                    // retain the last batch of the cleaning round, even if 
it's empty, so that last offset information
-                    // is not lost after cleaning.
+                else if (batch.nextOffset() == highWatermark) {
+                    // This is the last batch before the high watermark. 
Retain it even if empty, so that the last
+                    // offset information is not lost after cleaning.
                     batchRetention = BatchRetention.RETAIN_EMPTY;
                 } else if (discardBatchRecords)
                     batchRetention = BatchRetention.DELETE;

Reply via email to