This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 3bcfb64ea4e KAFKA-17076 retain last batch in each round of cleaning 
(#20535)
3bcfb64ea4e is described below

commit 3bcfb64ea4eb2786ac8c027551b02195fc4c297b
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Nov 24 09:48:15 2025 -0800

    KAFKA-17076 retain last batch in each round of cleaning (#20535)
    
    In each round of cleaning, retain the last batch even if it's empty, so
    that logEndOffset info will not get lost after compaction.
    
    Reviewers: Jun Rao <[email protected]>, PoAn Yang <[email protected]>,
    Chia-Ping Tsai <[email protected]>
    
    Co-authored-by: Vincent Jiang 
<[email protected]>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 16 ++++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 67 ++++++++++++++++------
 2 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index c39181338e5..c5f751f1343 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -624,7 +624,7 @@ private[log] class Cleaner(val id: Int,
     val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), 
log.config.segmentSize,
       log.config.maxIndexSize, cleanable.firstUncleanableOffset)
     for (group <- groupedSegments)
-      cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs)
+      cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset)
 
     // record buffer utilization
     stats.bufferUtilization = offsetMap.utilization
@@ -645,6 +645,7 @@ private[log] class Cleaner(val id: Int,
    * @param transactionMetadata State of ongoing transactions which is carried 
between the cleaning
    *                            of the grouped segments
    * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose 
version is less than 2
+   * @param upperBoundOffsetOfCleaningRound The upper bound offset of this 
round of cleaning
    */
   private[log] def cleanSegments(log: UnifiedLog,
                                  segments: Seq[LogSegment],
@@ -652,7 +653,8 @@ private[log] class Cleaner(val id: Int,
                                  currentTime: Long,
                                  stats: CleanerStats,
                                  transactionMetadata: 
CleanedTransactionMetadata,
-                                 legacyDeleteHorizonMs: Long): Unit = {
+                                 legacyDeleteHorizonMs: Long,
+                                 upperBoundOffsetOfCleaningRound: Long): Unit 
= {
     // create a new segment with a suffix appended to the name of the log and 
indexes
     val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, 
segments.head.baseOffset)
     transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
@@ -682,7 +684,7 @@ private[log] class Cleaner(val id: Int,
 
         try {
           cleanInto(log.topicPartition, currentSegment.log, cleaned, map, 
retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
-            log.config.maxMessageSize, transactionMetadata, 
lastOffsetOfActiveProducers, stats, currentTime = currentTime)
+            log.config.maxMessageSize, transactionMetadata, 
lastOffsetOfActiveProducers, upperBoundOffsetOfCleaningRound, stats, 
currentTime = currentTime)
         } catch {
           case e: LogSegmentOffsetOverflowException =>
             // Split the current segment. It's also safest to abort the 
current cleaning process, so that we retry from
@@ -728,6 +730,7 @@ private[log] class Cleaner(val id: Int,
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    * @param transactionMetadata The state of ongoing transactions which is 
carried between the cleaning of the grouped segments
    * @param lastRecordsOfActiveProducers The active producers and its last 
data offset
+   * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in 
the source segment
    * @param stats Collector for cleaning statistics
    * @param currentTime The time at which the clean was initiated
    */
@@ -740,6 +743,7 @@ private[log] class Cleaner(val id: Int,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: mutable.Map[Long, 
LastRecord],
+                             upperBoundOffsetOfCleaningRound: Long,
                              stats: CleanerStats,
                              currentTime: Long): Unit = {
     val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
@@ -774,7 +778,11 @@ private[log] class Cleaner(val id: Int,
         val batchRetention: BatchRetention =
           if (batch.hasProducerId && isBatchLastRecordOfProducer)
             BatchRetention.RETAIN_EMPTY
-          else if (discardBatchRecords)
+          else if (batch.nextOffset == upperBoundOffsetOfCleaningRound) {
+            // retain the last batch of the cleaning round, even if it's 
empty, so that last offset information
+            // is not lost after cleaning.
+            BatchRetention.RETAIN_EMPTY
+          } else if (discardBatchRecords)
             BatchRetention.DELETE
           else
             BatchRetention.DELETE_EMPTY
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 734845c440e..66341c72577 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -168,7 +168,7 @@ class LogCleanerTest extends Logging {
     val stats = new CleanerStats()
     val expectedBytesRead = segments.map(_.size).sum
     val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
-    cleaner.cleanSegments(log, segments, map, 0L, stats, new 
CleanedTransactionMetadata, -1)
+    cleaner.cleanSegments(log, segments, map, 0L, stats, new 
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
     assertEquals(expectedBytesRead, stats.bytesRead)
   }
@@ -256,7 +256,7 @@ class LogCleanerTest extends Logging {
     val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
     val stats = new CleanerStats()
     cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, 
stats)
-    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new 
CleanedTransactionMetadata, -1)
+    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new 
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
 
     // Validate based on the file name that log segment file is renamed 
exactly once for async deletion
     assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -423,7 +423,7 @@ class LogCleanerTest extends Logging {
       val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
       val stats = new CleanerStats(time)
       cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset, 
offsetMap, stats)
-      cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(), 
stats, new CleanedTransactionMetadata, Long.MaxValue)
+      cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(), 
stats, new CleanedTransactionMetadata, Long.MaxValue, 
segments.last.readNextOffset)
       dirtyOffset = offsetMap.latestOffset + 1
     }
 
@@ -925,7 +925,7 @@ class LogCleanerTest extends Logging {
 
     // clean the log
     val stats = new CleanerStats()
-    cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L, 
stats, new CleanedTransactionMetadata, -1)
+    cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L, 
stats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
     val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
@@ -938,7 +938,7 @@ class LogCleanerTest extends Logging {
     val (log, offsetMap) = 
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+    cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 
0L, new CleanerStats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
     val shouldRemain = LogTestUtils.keysInLog(log).filter(k => 
!offsetMap.map.containsKey(k.toString))
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
@@ -957,7 +957,7 @@ class LogCleanerTest extends Logging {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     assertThrows(classOf[CorruptRecordException], () =>
-      cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+      cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 
0L, new CleanerStats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
     )
   }
 
@@ -974,7 +974,7 @@ class LogCleanerTest extends Logging {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     assertThrows(classOf[CorruptRecordException], () =>
-      cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 
0L, new CleanerStats, new CleanedTransactionMetadata, -1)
+      cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 
0L, new CleanerStats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
     )
   }
 
@@ -1356,12 +1356,42 @@ class LogCleanerTest extends Logging {
     val keys = LogTestUtils.keysInLog(log)
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
+    val segments = log.logSegments.asScala.take(3).toSeq
     assertThrows(classOf[LogCleaningAbortedException], () =>
-      cleaner.cleanSegments(log, log.logSegments.asScala.take(3).toSeq, map, 
0L, new CleanerStats(),
-        new CleanedTransactionMetadata, -1)
+      cleaner.cleanSegments(log, segments, map, 0L, new CleanerStats(),
+        new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
     )
   }
 
+  @Test
+  def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // append messages to the log until we have four segments
+    while (log.numberOfSegments < 4)
+      log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
+    val keysFound = LogTestUtils.keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // pretend all keys are deleted
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keysFound.foreach(k => map.put(key(k), Long.MaxValue))
+
+    // clean the log
+    val segments = log.logSegments.asScala.take(3).toSeq
+    val stats = new CleanerStats()
+    cleaner.cleanSegments(log, segments, map, 0L, stats, new 
CleanedTransactionMetadata, -1, segments.last.readNextOffset)
+    assertEquals(2, log.logSegments.size)
+    assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size, 
"one batch should be retained in the cleaned segment")
+    val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
+    assertEquals(log.logSegments.asScala.last.baseOffset - 1, 
retainedBatch.lastOffset, "the retained batch should be the last batch")
+    assertFalse(retainedBatch.iterator.hasNext,  "the retained batch should be 
an empty batch")
+  }
+
   /**
    * Validate the logic for grouping log segments together for cleaning
    */
@@ -1618,16 +1648,17 @@ class LogCleanerTest extends Logging {
     // Try to clean segment with offset overflow. This will trigger log split 
and the cleaning itself must abort.
     assertThrows(classOf[LogCleaningAbortedException], () =>
       cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new 
CleanerStats(),
-        new CleanedTransactionMetadata, -1)
+        new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
     )
     assertEquals(numSegmentsInitial + 1, log.logSegments.size)
     assertEquals(allKeys, LogTestUtils.keysInLog(log))
     assertFalse(LogTestUtils.hasOffsetOverflow(log))
 
     // Clean each segment now that split is complete.
+    val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
     for (segmentToClean <- log.logSegments.asScala)
       cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new 
CleanerStats(),
-        new CleanedTransactionMetadata, -1)
+        new CleanedTransactionMetadata, -1, upperBoundOffset)
     assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
     assertFalse(LogTestUtils.hasOffsetOverflow(log))
     log.close()
@@ -1666,9 +1697,11 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
 
+    val upperBoundOffset = log.activeSegment.baseOffset
+
     // clean the log
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, 
offsetMap, 0L, new CleanerStats(),
-      new CleanedTransactionMetadata, -1)
+      new CleanedTransactionMetadata, -1, upperBoundOffset)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     var cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1684,7 +1717,7 @@ class LogCleanerTest extends Logging {
 
     // clean again
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, 
offsetMap, 0L, new CleanerStats(),
-      new CleanedTransactionMetadata, -1)
+      new CleanedTransactionMetadata, -1, upperBoundOffset)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1701,7 +1734,7 @@ class LogCleanerTest extends Logging {
 
     // clean again
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, 
offsetMap, 0L, new CleanerStats(),
-      new CleanedTransactionMetadata, -1)
+      new CleanedTransactionMetadata, -1, upperBoundOffset)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1723,7 +1756,7 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, 
offsetMap, 0L, new CleanerStats(),
-      new CleanedTransactionMetadata, -1)
+      new CleanedTransactionMetadata, -1, upperBoundOffset)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1741,7 +1774,7 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, 
offsetMap, 0L, new CleanerStats(),
-      new CleanedTransactionMetadata, -1)
+      new CleanedTransactionMetadata, -1, upperBoundOffset)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1759,7 +1792,7 @@ class LogCleanerTest extends Logging {
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, 
offsetMap, 0L, new CleanerStats(),
-      new CleanedTransactionMetadata, -1)
+      new CleanedTransactionMetadata, -1, upperBoundOffset)
     // clear scheduler so that async deletes don't run
     time.scheduler.clear()
     cleanedKeys = LogTestUtils.keysInLog(log)

Reply via email to