Repository: kafka
Updated Branches:
  refs/heads/trunk cf28f8939 -> dc54055d0


kafka-2235; LogCleaner offset map overflow; patched by Ivan Simoneko; reviewed 
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc54055d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc54055d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc54055d

Branch: refs/heads/trunk
Commit: dc54055d05742a4a7729a1fe1073c18e3d95cbb2
Parents: cf28f89
Author: Ivan Simoneko <simonenko....@gmail.com>
Authored: Mon Jun 22 09:19:45 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Jun 22 09:19:45 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc54055d/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index d07a391..b36ea0d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -559,11 +559,17 @@ private[log] class Cleaner(val id: Int,
     // but we may be able to fit more (if there is lots of duplication in the 
dirty section of the log)
     var offset = dirty.head.baseOffset
     require(offset == start, "Last clean offset is %d but segment base offset 
is %d for log %s.".format(start, offset, log.name))
-    val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
-    for (segment <- dirty) {
+    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
+    var full = false
+    for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
-      if(segment.baseOffset <= minStopOffset || map.utilization < 
this.dupBufferLoadFactor)
+      val segmentSize = segment.nextOffset() - segment.baseOffset
+
+      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s 
but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size 
or decrease log.cleaner.threads".format(segmentSize,  log.name, 
segment.log.file.getName, maxDesiredMapSize))
+      if (map.size + segmentSize <= maxDesiredMapSize)
         offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
+      else
+        full = true
     }
     info("Offset map for log %s complete.".format(log.name))
     offset

Reply via email to