This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8c292c [KAFKA-9826] Handle an unaligned first dirty offset during
log cleaning. (#8469)
b8c292c is described below
commit b8c292c36187d3feda8ae0ce22d58604115b8507
Author: Steve Rodrigues <[email protected]>
AuthorDate: Tue Apr 14 22:27:50 2020 -0700
[KAFKA-9826] Handle an unaligned first dirty offset during log cleaning.
(#8469)
In KAFKA-9826, a log whose first dirty offset was past the start of the
active segment and past the last cleaned point resulted in an endless cycle of
picking the segment to clean and discarding it. Though this didn't interfere
with cleaning other log segments, it kept the log cleaner thread continuously
busy (potentially wasting CPU and impacting other running threads) and filled
the logs with lots of extraneous messages.
This was determined to be because the active segment was getting mistakenly
picked for cleaning, and because the logSegments code handles (start == end)
cases only for (start, end) on a segment boundary: the intent is to return a
null list, but if they're not on a segment boundary, the routine returns that
segment.
This fix has two parts:
It changes logSegments to handle start==end by returning an empty List
always.
It changes the definition of calculateCleanableBytes to not consider
anything past the UncleanableOffset; previously, it would potentially shift the
UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset
was past the firstUncleanableOffset. This has no real effect now in the context
of the fix for (1) but it makes the code read more like the model that the code
is attempting to follow.
These changes require modifications to a few test cases that handled this
particular test case; they were introduced in the context of KAFKA-8764. Those
situations are now handled elsewhere in code, but the tests themselves allowed
a DirtyOffset in the active segment, and expected an active segment to be
selected for cleaning.
Reviewer: Jun Rao <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 23 ++++++++------
.../main/scala/kafka/log/LogCleanerManager.scala | 2 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 10 ++++---
core/src/test/scala/unit/kafka/log/LogTest.scala | 35 ++++++++++++++++++++++
4 files changed, 56 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index b1a0f05..a8148f6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2124,17 +2124,22 @@ class Log(@volatile private var _dir: File,
/**
* Get all segments beginning with the segment that includes "from" and
ending with the segment
- * that includes up to "to-1" or the end of the log (if to > logEndOffset)
+ * that includes up to "to-1" or the end of the log (if to > logEndOffset).
*/
def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
- lock synchronized {
- val view = Option(segments.floorKey(from)).map { floor =>
- if (to < floor)
- throw new IllegalArgumentException(s"Invalid log segment range:
requested segments in $topicPartition " +
- s"from offset $from mapping to segment with base offset $floor,
which is greater than limit offset $to")
- segments.subMap(floor, to)
- }.getOrElse(segments.headMap(to))
- view.values.asScala
+ if (from == to) {
+ // Handle non-segment-aligned empty sets
+ List.empty[LogSegment]
+ } else if (to < from) {
+ throw new IllegalArgumentException(s"Invalid log segment range:
requested segments in $topicPartition " +
+ s"from offset $from which is greater than limit offset $to")
+ } else {
+ lock synchronized {
+ val view = Option(segments.floorKey(from)).map { floor =>
+ segments.subMap(floor, to)
+ }.getOrElse(segments.headMap(to))
+ view.values.asScala
+ }
}
}
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f9ea46b..473f1fb 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -599,7 +599,7 @@ private[log] object LogCleanerManager extends Logging {
def calculateCleanableBytes(log: Log, firstDirtyOffset: Long,
uncleanableOffset: Long): (Long, Long) = {
val firstUncleanableSegment =
log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
- val cleanableBytes = log.logSegments(firstDirtyOffset,
math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+ val cleanableBytes = log.logSegments(math.min(firstDirtyOffset,
firstUncleanableOffset), firstUncleanableOffset).map(_.size.toLong).sum
(firstUncleanableOffset, cleanableBytes)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index e5cf6ee..448fcb7 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -237,8 +237,9 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 0L)
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
- assertEquals(2L, filthiestLog.firstDirtyOffset)
+ // The active segment is uncleanable and hence not filthy from the POV of
the CleanerManager.
+ val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+ assertEquals(None, filthiestLog)
}
@Test
@@ -262,8 +263,9 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 3L)
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
- assertEquals(3L, filthiestLog.firstDirtyOffset)
+ // These segments are uncleanable and hence not filthy
+ val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+ assertEquals(None, filthiestLog)
}
/**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 0cc19dd..12f0b1c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -469,6 +469,41 @@ class LogTest {
assertEquals(101L, log.logEndOffset)
}
+ /**
+ * Test the values returned by the logSegments call
+ */
+ @Test
+ def testLogSegmentsCallCorrect(): Unit = {
+ // Create 3 segments and make sure we get the right values from various
logSegments calls.
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds)
+ def getSegmentOffsets(log :Log, from: Long, to: Long) =
log.logSegments(from, to).map { _.baseOffset }
+ val setSize = createRecords.sizeInBytes
+ val msgPerSeg = 10
+ val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
+ // create a log
+ val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+ val log = createLog(logDir, logConfig)
+ assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+ // segments expire in size
+ for (_ <- 1 to (2 * msgPerSeg + 2))
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ assertEquals("There should be exactly 3 segments.", 3,
log.numberOfSegments)
+
+ // from == to should always be null
+ assertEquals(List.empty[LogSegment], getSegmentOffsets(log, 10, 10))
+ assertEquals(List.empty[LogSegment], getSegmentOffsets(log, 15, 15))
+
+ assertEquals(List[Long](0, 10, 20), getSegmentOffsets(log, 0, 21))
+
+ assertEquals(List[Long](0), getSegmentOffsets(log, 1, 5))
+ assertEquals(List[Long](10, 20), getSegmentOffsets(log, 13, 21))
+ assertEquals(List[Long](10), getSegmentOffsets(log, 13, 17))
+
+ // from < to is bad
+ assertThrows[IllegalArgumentException]({ log.logSegments(10, 0) })
+ }
+
@Test
def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
// simulate the upgrade path by creating a new log with several segments,
deleting the