This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8c292c  [KAFKA-9826] Handle an unaligned first dirty offset during 
log cleaning. (#8469)
b8c292c is described below

commit b8c292c36187d3feda8ae0ce22d58604115b8507
Author: Steve Rodrigues <[email protected]>
AuthorDate: Tue Apr 14 22:27:50 2020 -0700

    [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. 
(#8469)
    
    In KAFKA-9826, a log whose first dirty offset was past the start of the 
active segment and past the last cleaned point resulted in an endless cycle of 
picking the segment to clean and discarding it. Though this didn't interfere 
with cleaning other log segments, it kept the log cleaner thread continuously 
busy (potentially wasting CPU and impacting other running threads) and filled 
the logs with lots of extraneous messages.
    
    This was determined to be because the active segment was getting mistakenly 
picked for cleaning, and because the logSegments code handles (start == end) 
cases only for (start, end) on a segment boundary: the intent is to return a 
null list, but if they're not on a segment boundary, the routine returns that 
segment.
    
    This fix has two parts:
    
    It changes logSegments to handle start==end by returning an empty List 
always.
    
    It changes the definition of calculateCleanableBytes to not consider 
anything past the UncleanableOffset; previously, it would potentially shift the 
UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset 
was past the firstUncleanableOffset. This has no real effect now in the context 
of the fix for (1) but it makes the code read more like the model that the code 
is attempting to follow.
    
    These changes require modifications to a few test cases that handled this 
particular test case; they were introduced in the context of KAFKA-8764. Those 
situations are now handled elsewhere in code, but the tests themselves allowed 
a DirtyOffset in the active segment, and expected an active segment to be 
selected for cleaning.
    
    Reviewer: Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala            | 23 ++++++++------
 .../main/scala/kafka/log/LogCleanerManager.scala   |  2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 10 ++++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 35 ++++++++++++++++++++++
 4 files changed, 56 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index b1a0f05..a8148f6 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2124,17 +2124,22 @@ class Log(@volatile private var _dir: File,
 
   /**
    * Get all segments beginning with the segment that includes "from" and 
ending with the segment
-   * that includes up to "to-1" or the end of the log (if to > logEndOffset)
+   * that includes up to "to-1" or the end of the log (if to > logEndOffset).
    */
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
-    lock synchronized {
-      val view = Option(segments.floorKey(from)).map { floor =>
-        if (to < floor)
-          throw new IllegalArgumentException(s"Invalid log segment range: 
requested segments in $topicPartition " +
-            s"from offset $from mapping to segment with base offset $floor, 
which is greater than limit offset $to")
-        segments.subMap(floor, to)
-      }.getOrElse(segments.headMap(to))
-      view.values.asScala
+    if (from == to) {
+      // Handle non-segment-aligned empty sets
+      List.empty[LogSegment]
+    } else if (to < from) {
+      throw new IllegalArgumentException(s"Invalid log segment range: 
requested segments in $topicPartition " +
+        s"from offset $from which is greater than limit offset $to")
+    } else {
+      lock synchronized {
+        val view = Option(segments.floorKey(from)).map { floor =>
+          segments.subMap(floor, to)
+        }.getOrElse(segments.headMap(to))
+        view.values.asScala
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f9ea46b..473f1fb 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -599,7 +599,7 @@ private[log] object LogCleanerManager extends Logging {
   def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, 
uncleanableOffset: Long): (Long, Long) = {
     val firstUncleanableSegment = 
log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
     val firstUncleanableOffset = firstUncleanableSegment.baseOffset
-    val cleanableBytes = log.logSegments(firstDirtyOffset, 
math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+    val cleanableBytes = log.logSegments(math.min(firstDirtyOffset, 
firstUncleanableOffset), firstUncleanableOffset).map(_.size.toLong).sum
 
     (firstUncleanableOffset, cleanableBytes)
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index e5cf6ee..448fcb7 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -237,8 +237,9 @@ class LogCleanerManagerTest extends Logging {
     val cleanerManager = createCleanerManagerMock(logs)
     cleanerCheckpoints.put(tp, 0L)
 
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
-    assertEquals(2L, filthiestLog.firstDirtyOffset)
+    // The active segment is uncleanable and hence not filthy from the POV of 
the CleanerManager.
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals(None, filthiestLog)
   }
 
   @Test
@@ -262,8 +263,9 @@ class LogCleanerManagerTest extends Logging {
     val cleanerManager = createCleanerManagerMock(logs)
     cleanerCheckpoints.put(tp, 3L)
 
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
-    assertEquals(3L, filthiestLog.firstDirtyOffset)
+    // These segments are uncleanable and hence not filthy
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals(None, filthiestLog)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 0cc19dd..12f0b1c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -469,6 +469,41 @@ class LogTest {
     assertEquals(101L, log.logEndOffset)
   }
 
+  /**
+   * Test the values returned by the logSegments call
+   */
+  @Test
+  def testLogSegmentsCallCorrect(): Unit = {
+    // Create 3 segments and make sure we get the right values from various 
logSegments calls.
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds)
+    def getSegmentOffsets(log :Log, from: Long, to: Long) = 
log.logSegments(from, to).map { _.baseOffset }
+    val setSize = createRecords.sizeInBytes
+    val msgPerSeg = 10
+    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
+    // create a log
+    val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+    val log = createLog(logDir, logConfig)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    // segments expire in size
+    for (_ <- 1 to (2 * msgPerSeg + 2))
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    assertEquals("There should be exactly 3 segments.", 3, 
log.numberOfSegments)
+
+    // from == to should always be null
+    assertEquals(List.empty[LogSegment], getSegmentOffsets(log, 10, 10))
+    assertEquals(List.empty[LogSegment], getSegmentOffsets(log, 15, 15))
+
+    assertEquals(List[Long](0, 10, 20), getSegmentOffsets(log, 0, 21))
+
+    assertEquals(List[Long](0), getSegmentOffsets(log, 1, 5))
+    assertEquals(List[Long](10, 20), getSegmentOffsets(log, 13, 21))
+    assertEquals(List[Long](10), getSegmentOffsets(log, 13, 17))
+
+    // from < to is bad
+    assertThrows[IllegalArgumentException]({ log.logSegments(10, 0) })
+  }
+
   @Test
   def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
     // simulate the upgrade path by creating a new log with several segments, 
deleting the

Reply via email to