This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64b8e17  KAFKA-13194: bound cleaning by both LSO and HWM when 
firstUnstableOffsetMetadata is None (#11199)
64b8e17 is described below

commit 64b8e17827251174490678dd296ce2c1a79ff5ef
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Fri Aug 13 15:35:59 2021 -0700

    KAFKA-13194: bound cleaning by both LSO and HWM when 
firstUnstableOffsetMetadata is None (#11199)
    
    When the high watermark is contained in a non-active segment, we are not 
correctly bounding it by the hwm. This means that uncommitted records may 
overwrite committed data. I've separated out the bounding point tests to check 
the hwm case in addition to the existing active segment case.
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../main/scala/kafka/log/LogCleanerManager.scala   | 11 ++++----
 .../log/AbstractLogCleanerIntegrationTest.scala    |  2 ++
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |  2 ++
 .../kafka/log/LogCleanerLagIntegrationTest.scala   |  2 ++
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 30 +++++++++++++++++++++-
 .../LogCleanerParameterizedIntegrationTest.scala   |  6 +++++
 6 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 7e14184..c0030b1 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -590,13 +590,14 @@ private[log] object LogCleanerManager extends Logging {
 
     val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
-    // find first segment that cannot be cleaned
-    // neither the active segment, nor segments with any messages closer to 
the head of the log than the minimum compaction lag time
-    // may be cleaned
+    // Find the first segment that cannot be cleaned. We cannot clean past:
+    // 1. The active segment
+    // 2. The last stable offset (including the high watermark)
+    // 3. Any segments closer to the head of the log than the minimum 
compaction lag time
     val firstUncleanableDirtyOffset: Long = Seq(
 
-      // we do not clean beyond the first unstable offset
-      log.firstUnstableOffset,
+      // we do not clean beyond the last stable offset
+      Some(log.lastStableOffset),
 
       // the active segment is always uncleanable
       Option(log.activeSegment.baseOffset),
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 2fc7942..ba10336 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -139,6 +139,8 @@ abstract class AbstractLogCleanerIntegrationTest {
       val value = counter.toString
       val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = 
value.toString.getBytes, codec = codec,
         key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       incCounter()
       (key, value, appendInfo.firstOffset.get.messageOffset)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 16b219c..5a65fe3 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -192,6 +192,8 @@ class LogCleanerIntegrationTest extends 
AbstractLogCleanerIntegrationTest with K
       val curValue = valCounter
       log.appendAsLeader(TestUtils.singletonRecords(value = 
curValue.toString.getBytes, codec = codec,
         key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       valCounter += step
       (key, curValue)
     }
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index c077542..7e0a33d 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -106,6 +106,8 @@ class LogCleanerLagIntegrationTest extends 
AbstractLogCleanerIntegrationTest wit
       val count = counter
       log.appendAsLeader(TestUtils.singletonRecords(value = 
counter.toString.getBytes, codec = codec,
               key = key.toString.getBytes, timestamp = timestamp), leaderEpoch 
= 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       incCounter()
       (key, count)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 93d9713..e20e661 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -529,7 +529,7 @@ class LogCleanerManagerTest extends Logging {
   }
 
   /**
-    * Test computation of cleanable range with no minimum compaction lag 
settings active
+    * Test computation of cleanable range with no minimum compaction lag 
settings active where bounded by LSO
     */
   @Test
   def testCleanableOffsetsForNone(): Unit = {
@@ -541,6 +541,30 @@ class LogCleanerManagerTest extends Logging {
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
+    log.updateHighWatermark(50)
+
+    val lastCleanOffset = Some(0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
+    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
+    assertEquals(log.highWatermark, log.lastStableOffset, "The high watermark 
equals the last stable offset as no transactions are in progress")
+    assertEquals(log.lastStableOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is 
bounded by the last stable offset.")
+  }
+
+  /**
+   * Test computation of cleanable range with no minimum compaction lag 
settings active where bounded by active segment
+   */
+  @Test
+  def testCleanableOffsetsActiveSegment(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    while(log.numberOfSegments < 8)
+      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+
+    log.updateHighWatermark(log.logEndOffset)
+
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
     assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
@@ -571,6 +595,8 @@ class LogCleanerManagerTest extends Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, t1), leaderEpoch = 0)
 
+    log.updateHighWatermark(log.logEndOffset)
+
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
     assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
@@ -594,6 +620,8 @@ class LogCleanerManagerTest extends Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, t0), leaderEpoch = 0)
 
+    log.updateHighWatermark(log.logEndOffset)
+
     time.sleep(compactionLag + 1)
 
     val lastCleanOffset = Some(0L)
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 1471ff1..70ac47e 100755
--- 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -66,6 +66,8 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     checkLogAfterAppendingDups(log, startSize, appends)
 
     val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
+    // move LSO forward to increase compaction bound
+    log.updateHighWatermark(log.logEndOffset)
     val largeMessageOffset = appendInfo.firstOffset.get.messageOffset
 
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, 
numDups = 3, log = log, codec = codec)
@@ -166,6 +168,8 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     val appends2: Seq[(Int, String, Long)] = {
       val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = 
codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
       val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
+      // move LSO forward to increase compaction bound
+      log.updateHighWatermark(log.logEndOffset)
       val largeMessageOffset = appendInfo.firstOffset.map(_.messageOffset).get
 
       // also add some messages with version 1 and version 2 to check that we 
handle mixed format versions correctly
@@ -311,6 +315,8 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     }
 
     val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, 
codec, records: _*), leaderEpoch = 0)
+    // move LSO forward to increase compaction bound
+    log.updateHighWatermark(log.logEndOffset)
     val offsets = appendInfo.firstOffset.get.messageOffset to 
appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }

Reply via email to