This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 64b8e17 KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None (#11199) 64b8e17 is described below commit 64b8e17827251174490678dd296ce2c1a79ff5ef Author: Lucas Bradstreet <lu...@confluent.io> AuthorDate: Fri Aug 13 15:35:59 2021 -0700 KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None (#11199) When the high watermark is contained in a non-active segment, we are not correctly bounding it by the hwm. This means that uncommitted records may overwrite committed data. I've separated out the bounding point tests to check the hwm case in addition to the existing active segment case. Reviewers: Jun Rao <jun...@gmail.com> --- .../main/scala/kafka/log/LogCleanerManager.scala | 11 ++++---- .../log/AbstractLogCleanerIntegrationTest.scala | 2 ++ .../unit/kafka/log/LogCleanerIntegrationTest.scala | 2 ++ .../kafka/log/LogCleanerLagIntegrationTest.scala | 2 ++ .../unit/kafka/log/LogCleanerManagerTest.scala | 30 +++++++++++++++++++++- .../LogCleanerParameterizedIntegrationTest.scala | 6 +++++ 6 files changed, 47 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 7e14184..c0030b1 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -590,13 +590,14 @@ private[log] object LogCleanerManager extends Logging { val minCompactionLagMs = math.max(log.config.compactionLagMs, 0L) - // find first segment that cannot be cleaned - // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time - // may be cleaned + // Find the first segment that cannot be cleaned. We cannot clean past: + // 1. The active segment + // 2. The last stable offset (including the high watermark) + // 3. Any segments closer to the head of the log than the minimum compaction lag time val firstUncleanableDirtyOffset: Long = Seq( - // we do not clean beyond the first unstable offset - log.firstUnstableOffset, + // we do not clean beyond the last stable offset + Some(log.lastStableOffset), // the active segment is always uncleanable Option(log.activeSegment.baseOffset), diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 2fc7942..ba10336 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -139,6 +139,8 @@ abstract class AbstractLogCleanerIntegrationTest { val value = counter.toString val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec, key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) incCounter() (key, value, appendInfo.firstOffset.get.messageOffset) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 16b219c..5a65fe3 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -192,6 +192,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K val curValue = valCounter log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) valCounter += step (key, curValue) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index c077542..7e0a33d 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -106,6 +106,8 @@ class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest wit val count = counter log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) incCounter() (key, count) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 93d9713..e20e661 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -529,7 +529,7 @@ class LogCleanerManagerTest extends Logging { } /** - * Test computation of cleanable range with no minimum compaction lag settings active + * Test computation of cleanable range with no minimum compaction lag settings active where bounded by LSO */ @Test def testCleanableOffsetsForNone(): Unit = { @@ -541,6 +541,30 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) + log.updateHighWatermark(50) + + val lastCleanOffset = Some(0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) + assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") + assertEquals(log.highWatermark, log.lastStableOffset, "The high watermark equals the last stable offset as no transactions are in progress") + assertEquals(log.lastStableOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is bounded by the last stable offset.") + } + + /** + * Test computation of cleanable range with no minimum compaction lag settings active where bounded by active segment + */ + @Test + def testCleanableOffsetsActiveSegment(): Unit = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + while(log.numberOfSegments < 8) + log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) + + log.updateHighWatermark(log.logEndOffset) + val lastCleanOffset = Some(0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") @@ -571,6 +595,8 @@ class LogCleanerManagerTest extends Logging { while (log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0) + log.updateHighWatermark(log.logEndOffset) + val lastCleanOffset = Some(0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") @@ -594,6 +620,8 @@ class LogCleanerManagerTest extends Logging { while (log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0), leaderEpoch = 0) + log.updateHighWatermark(log.logEndOffset) + time.sleep(compactionLag + 1) val lastCleanOffset = Some(0L) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 1471ff1..70ac47e 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -66,6 +66,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati checkLogAfterAppendingDups(log, startSize, appends) val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) val largeMessageOffset = appendInfo.firstOffset.get.messageOffset val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec) @@ -166,6 +168,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati val appends2: Seq[(Int, String, Long)] = { val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) val largeMessageOffset = appendInfo.firstOffset.map(_.messageOffset).get // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly @@ -311,6 +315,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati } val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) val offsets = appendInfo.firstOffset.get.messageOffset to appendInfo.lastOffset kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }