This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41e4e93  KAFKA-6429; LogCleanerManager.cleanableOffsets should create 
objects … (#4399)
41e4e93 is described below

commit 41e4e93b5ae8a7d221fce1733e050cb98ac9713c
Author: huxi <[email protected]>
AuthorDate: Fri Jan 26 01:49:50 2018 +0800

    KAFKA-6429; LogCleanerManager.cleanableOffsets should create objects … 
(#4399)
    
    …for dirty non-active segments only when 
`log.cleaner.min.compaction.lag.ms` is greater than 0
    
    With `log.cleaner.min.compaction.lag.ms`'s default value of 0, there is no 
need to hold heap objects for those dirty non-active segments. This could 
reduce the heap size and also avoid the unnecessary monitor lock retrieval.
---
 core/src/main/scala/kafka/log/LogCleanerManager.scala | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b48b757..c3d3892 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -344,10 +344,7 @@ private[log] object LogCleanerManager extends Logging {
         offset
       }
     }
-
-    // dirty log segments
-    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset)
-
+    
     val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
     // find first segment that cannot be cleaned
@@ -363,6 +360,8 @@ private[log] object LogCleanerManager extends Logging {
 
       // the first segment whose largest message timestamp is within a minimum 
time lag from now
       if (compactionLagMs > 0) {
+        // dirty log segments
+        val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset)
         dirtyNonActiveSegments.find { s =>
           val isUncleanable = s.largestTimestamp > now - compactionLagMs
           debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} 
segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - 
compactionLagMs}; is uncleanable=$isUncleanable")

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to