This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 4e5b864 KAFKA-9133; Cleaner should handle log start offset larger
than active segment base offset (#7662)
4e5b864 is described below
commit 4e5b86419982050217d06b3c30ba6236e1fd9090
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Nov 8 19:35:22 2019 -0800
KAFKA-9133; Cleaner should handle log start offset larger than active
segment base offset (#7662)
This was a regression in 2.3.1. In the case of a DeleteRecords call, the
log start offset may be higher than the active segment base offset. The cleaner
should allow for this case gracefully.
Reviewers: Jun Rao <[email protected]>
Co-Authored-By: Tim Van Laer <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 10 +--
core/src/main/scala/kafka/log/LogCleaner.scala | 13 +--
.../main/scala/kafka/log/LogCleanerManager.scala | 43 +++++++---
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 4 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 99 ++++++++++++++++------
core/src/test/scala/unit/kafka/log/LogTest.scala | 1 +
6 files changed, 111 insertions(+), 59 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 491a4c4..3291e1d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1918,8 +1918,8 @@ class Log(@volatile var dir: File,
lock synchronized {
val view = Option(segments.floorKey(from)).map { floor =>
if (to < floor)
- throw new IllegalArgumentException(s"Invalid log segment range:
requested segments from offset $from " +
- s"mapping to segment with base offset $floor, which is greater
than limit offset $to")
+ throw new IllegalArgumentException(s"Invalid log segment range:
requested segments in $topicPartition " +
+ s"from offset $from mapping to segment with base offset $floor,
which is greater than limit offset $to")
segments.subMap(floor, to)
}.getOrElse(segments.headMap(to))
view.values.asScala
@@ -1929,9 +1929,9 @@ class Log(@volatile var dir: File,
def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = {
lock synchronized {
if (from > activeSegment.baseOffset)
- throw new IllegalArgumentException("Illegal request for non-active
segments beginning at " +
- s"offset $from, which is larger than the active segment's base
offset ${activeSegment.baseOffset}")
- logSegments(from, activeSegment.baseOffset)
+ Seq.empty
+ else
+ logSegments(from, activeSegment.baseOffset)
}
}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7e51ff4..47abae5 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -435,17 +435,6 @@ object LogCleaner {
fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize,
preallocate = log.config.preallocate)
}
- /**
- * Given the first dirty offset and an uncleanable offset, calculates the
total cleanable bytes for this log
- * @return the biggest uncleanable offset and the total amount of cleanable
bytes
- */
- def calculateCleanableBytes(log: Log, firstDirtyOffset: Long,
uncleanableOffset: Long): (Long, Long) = {
- val firstUncleanableSegment =
log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
- val firstUncleanableOffset = firstUncleanableSegment.baseOffset
- val cleanableBytes = log.logSegments(firstDirtyOffset,
math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
-
- (firstUncleanableOffset, cleanableBytes)
- }
}
/**
@@ -1048,7 +1037,7 @@ private case class LogToClean(topicPartition:
TopicPartition,
uncleanableOffset: Long,
needCompactionNow: Boolean = false) extends
Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
- val (firstUncleanableOffset, cleanableBytes) =
LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
+ val (firstUncleanableOffset, cleanableBytes) =
LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset,
uncleanableOffset)
val totalBytes = cleanBytes + cleanableBytes
val cleanableRatio = cleanableBytes / totalBytes.toDouble
override def compare(that: LogToClean): Int =
math.signum(this.cleanableRatio - that.cleanableRatio).toInt
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index efa4d94..18bb871 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.KafkaStorageException
-import scala.collection.{Iterable, immutable, mutable}
+import scala.collection.{Iterable, Seq, mutable}
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -105,8 +105,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val now = Time.SYSTEM.milliseconds
partitions.map { tp =>
val log = logs.get(tp)
- val (firstDirtyOffset, firstUncleanableDirtyOffset) =
cleanableOffsets(log, tp, lastClean, now)
- val (_, uncleanableBytes) =
LogCleaner.calculateCleanableBytes(log, firstDirtyOffset,
firstUncleanableDirtyOffset)
+ val lastCleanOffset = lastClean.get(tp)
+ val (firstDirtyOffset, firstUncleanableDirtyOffset) =
cleanableOffsets(log, lastCleanOffset, now)
+ val (_, uncleanableBytes) = calculateCleanableBytes(log,
firstDirtyOffset, firstUncleanableDirtyOffset)
uncleanableBytes
}.sum
}
@@ -180,7 +181,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
inProgress.contains(topicPartition) || isUncleanablePartition(log,
topicPartition)
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
- val (firstDirtyOffset, firstUncleanableDirtyOffset) =
cleanableOffsets(log, topicPartition, lastClean, now)
+ val lastCleanOffset = lastClean.get(topicPartition)
+ val (firstDirtyOffset, firstUncleanableDirtyOffset) =
cleanableOffsets(log, lastCleanOffset, now)
val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset,
now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
@@ -510,15 +512,11 @@ private[log] object LogCleanerManager extends Logging {
* Returns the range of dirty offsets that can be cleaned.
*
* @param log the log
- * @param lastClean the map of checkpointed offsets
+ * @param lastCleanOffset the last checkpointed offset
* @param now the current time in milliseconds of the cleaning operation
* @return the lower (inclusive) and upper (exclusive) offsets
*/
- def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean:
immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
-
- // the checkpointed offset, ie., the first offset of the next dirty segment
- val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)
-
+ def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long):
(Long, Long) = {
// If the log segments are abnormally truncated and hence the checkpointed
offset is no longer valid;
// reset to the log starting offset and log the error
val firstDirtyOffset = {
@@ -534,7 +532,7 @@ private[log] object LogCleanerManager extends Logging {
} else if (checkpointDirtyOffset > log.logEndOffset) {
// The dirty offset has gotten ahead of the log end offset. This could
happen if there was data
// corruption at the end of the log. We conservatively assume that the
full log needs cleaning.
- warn(s"The last checkpoint dirty offset for partition $topicPartition
is $checkpointDirtyOffset, " +
+ warn(s"The last checkpoint dirty offset for partition ${log.name} is
$checkpointDirtyOffset, " +
s"which is larger than the log end offset ${log.logEndOffset}.
Resetting to the log start offset $logStartOffset.")
logStartOffset
} else {
@@ -561,14 +559,31 @@ private[log] object LogCleanerManager extends Logging {
val dirtyNonActiveSegments =
log.nonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
- debug(s"Checking if log segment may be cleaned: log='${log.name}'
segment.baseOffset=${s.baseOffset}
segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now -
minCompactionLagMs}; is uncleanable=$isUncleanable")
+ debug(s"Checking if log segment may be cleaned: log='${log.name}'
segment.baseOffset=${s.baseOffset} " +
+ s"segment.largestTimestamp=${s.largestTimestamp}; now -
compactionLag=${now - minCompactionLagMs}; " +
+ s"is uncleanable=$isUncleanable")
isUncleanable
}.map(_.baseOffset)
} else None
).flatten.min
- debug(s"Finding range of cleanable offsets for log=${log.name}
topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now =>
firstDirtyOffset=$firstDirtyOffset
firstUncleanableOffset=$firstUncleanableDirtyOffset
activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+ debug(s"Finding range of cleanable offsets for log=${log.name}. Last clean
offset=$lastCleanOffset " +
+ s"now=$now => firstDirtyOffset=$firstDirtyOffset
firstUncleanableOffset=$firstUncleanableDirtyOffset " +
+ s"activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+
+ (firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset))
+ }
+
+ /**
+ * Given the first dirty offset and an uncleanable offset, calculates the
total cleanable bytes for this log
+ * @return the biggest uncleanable offset and the total amount of cleanable
bytes
+ */
+ def calculateCleanableBytes(log: Log, firstDirtyOffset: Long,
uncleanableOffset: Long): (Long, Long) = {
+ val firstUncleanableSegment =
log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
+ val firstUncleanableOffset = firstUncleanableSegment.baseOffset
+ val cleanableBytes = log.logSegments(firstDirtyOffset,
math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
- (firstDirtyOffset, firstUncleanableDirtyOffset)
+ (firstUncleanableOffset, cleanableBytes)
}
+
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 2d342fa..b189392 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -84,8 +84,8 @@ class LogCleanerIntegrationTest extends
AbstractLogCleanerIntegrationTest {
val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes",
uncleanableDirectory)
TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() ==
2, "There should be 2 uncleanable partitions", 2000L)
- val expectedTotalUncleanableBytes =
LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 +
- LogCleaner.calculateCleanableBytes(log2, 0,
log2.logSegments.last.baseOffset)._2
+ val expectedTotalUncleanableBytes =
LogCleanerManager.calculateCleanableBytes(log, 0,
log.logSegments.last.baseOffset)._2 +
+ LogCleanerManager.calculateCleanableBytes(log2, 0,
log2.logSegments.last.baseOffset)._2
TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() ==
expectedTotalUncleanableBytes,
s"There should be $expectedTotalUncleanableBytes uncleanable bytes",
1000L)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 83098bf..309dee8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -177,6 +177,54 @@ class LogCleanerManagerTest extends Logging {
assertEquals(10L, filthiestLog.firstDirtyOffset)
}
+ @Test
+ def testLogStartOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val log = createLog(segmentSize = 2048, LogConfig.Compact, tp)
+
+ val logs = new Pool[TopicPartition, Log]()
+ logs.put(tp, log)
+
+ appendRecords(log, numRecords = 3)
+ appendRecords(log, numRecords = 3)
+ appendRecords(log, numRecords = 3)
+
+ assertEquals(1, log.logSegments.size)
+
+ log.maybeIncrementLogStartOffset(2L)
+
+ val cleanerManager = createCleanerManagerMock(logs)
+ cleanerCheckpoints.put(tp, 0L)
+
+ val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+ assertEquals(2L, filthiestLog.firstDirtyOffset)
+ }
+
+ @Test
+ def testDirtyOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
+ // It is possible in the case of an unclean leader election for the
checkpoint
+ // dirty offset to get ahead of the active segment base offset, but still
be
+ // within the range of the log.
+
+ val tp = new TopicPartition("foo", 0)
+
+ val logs = new Pool[TopicPartition, Log]()
+ val log = createLog(2048, LogConfig.Compact, topicPartition = tp)
+ logs.put(tp, log)
+
+ appendRecords(log, numRecords = 3)
+ appendRecords(log, numRecords = 3)
+
+ assertEquals(1, log.logSegments.size)
+ assertEquals(0L, log.activeSegment.baseOffset)
+
+ val cleanerManager = createCleanerManagerMock(logs)
+ cleanerCheckpoints.put(tp, 3L)
+
+ val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+ assertEquals(3L, filthiestLog.firstDirtyOffset)
+ }
+
/**
* When checking for logs with segments ready for deletion
* we shouldn't consider logs where cleanup.policy=delete
@@ -337,8 +385,8 @@ class LogCleanerManagerTest extends Logging {
while(log.numberOfSegments < 8)
log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
- val lastClean = Map(topicPartition -> 0L)
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
topicPartition, lastClean, time.milliseconds)
+ val lastCleanOffset = Some(0L)
+ val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the
log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with the active
segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
}
@@ -367,8 +415,8 @@ class LogCleanerManagerTest extends Logging {
while (log.numberOfSegments < 8)
log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, t1), leaderEpoch = 0)
- val lastClean = Map(topicPartition -> 0L)
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
topicPartition, lastClean, time.milliseconds)
+ val lastCleanOffset = Some(0L)
+ val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the
log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with the second block of
log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
}
@@ -392,8 +440,8 @@ class LogCleanerManagerTest extends Logging {
time.sleep(compactionLag + 1)
- val lastClean = Map(topicPartition -> 0L)
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
topicPartition, lastClean, time.milliseconds)
+ val lastCleanOffset = Some(0L)
+ val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the
log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with active segment.",
log.activeSegment.baseOffset, cleanableOffsets._2)
}
@@ -420,8 +468,7 @@ class LogCleanerManagerTest extends Logging {
time.sleep(compactionLag + 1)
// although the compaction lag has been exceeded, the undecided data
should not be cleaned
- var cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
topicPartition,
- Map(topicPartition -> 0L), time.milliseconds())
+ var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L),
time.milliseconds())
assertEquals(0L, cleanableOffsets._1)
assertEquals(0L, cleanableOffsets._2)
@@ -431,16 +478,14 @@ class LogCleanerManagerTest extends Logging {
log.onHighWatermarkIncremented(4L)
// the first segment should now become cleanable immediately
- cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
- Map(topicPartition -> 0L), time.milliseconds())
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L),
time.milliseconds())
assertEquals(0L, cleanableOffsets._1)
assertEquals(3L, cleanableOffsets._2)
time.sleep(compactionLag + 1)
// the second segment becomes cleanable after the compaction lag
- cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
- Map(topicPartition -> 0L), time.milliseconds())
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L),
time.milliseconds())
assertEquals(0L, cleanableOffsets._1)
assertEquals(4L, cleanableOffsets._2)
}
@@ -531,26 +576,28 @@ class LogCleanerManagerTest extends Logging {
recordsPerBatch: Int,
batchesPerSegment: Int): Unit = {
for (i <- 0 until numBatches) {
- val startOffset = i * recordsPerBatch
- val endOffset = startOffset + recordsPerBatch
- var lastTimestamp = 0L
- val records = (startOffset until endOffset).map { offset =>
- val currentTimestamp = time.milliseconds()
- if (offset == endOffset - 1)
- lastTimestamp = currentTimestamp
-
- new SimpleRecord(currentTimestamp, s"key-$offset".getBytes,
s"value-$offset".getBytes)
- }
-
- log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 1)
- log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset)
-
+ appendRecords(log, recordsPerBatch)
if (i % batchesPerSegment == 0)
log.roll()
}
log.roll()
}
+ private def appendRecords(log: Log, numRecords: Int): Unit = {
+ val startOffset = log.logEndOffset
+ val endOffset = startOffset + numRecords
+ var lastTimestamp = 0L
+ val records = (startOffset until endOffset).map { offset =>
+ val currentTimestamp = time.milliseconds()
+ if (offset == endOffset - 1)
+ lastTimestamp = currentTimestamp
+ new SimpleRecord(currentTimestamp, s"key-$offset".getBytes,
s"value-$offset".getBytes)
+ }
+
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 1)
+ log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset)
+ }
+
private def makeLog(dir: File = logDir, config: LogConfig) =
Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L,
scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats,
maxProducerIdExpirationMs = 60 * 60 * 1000,
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a864da0..f62205f 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -355,6 +355,7 @@ class LogTest {
assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L))
assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L))
assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L))
+ assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(6L))
}
@Test