[ 
https://issues.apache.org/jira/browse/KAFKA-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15700688#comment-15700688
 ] 

Michael Schiff commented on KAFKA-4451:
---------------------------------------

After some further exploration:


{code:title=ReplicaFetcherThread.scala:131}
      replica.log.get.append(messageSet, assignOffsets = false)
{code}

{code:title=Log.scala:405}
        // maybe roll the log if this segment is full
        val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
                                maxTimestampInMessages = 
appendInfo.maxTimestamp)
{code}

maybeRoll depends on AbtractIndex.isFulll
{code:title=AbstractIndex.scala:75}
  /**
   * The maximum number of entries this index can hold
   */
  @volatile
  private[this] var _maxEntries = mmap.limit / entrySize

  /** The number of entries in this index */
  @volatile
  protected var _entries = mmap.position / entrySize

  /**
   * True iff there are no more slots available in this index
   */
  def isFull: Boolean = _entries >= _maxEntries
{code}

It appears the logic for whether or not to roll the segment depends on the 
absolute number of messages in the index and not the range of offsets stored in 
the index.  This allows the index to be overflown in the case of a heavily 
compacted topic.

Protective logic for the same issue is present in LogCleaner.groupSegmentsBySize
{code:title=LogCleaner.scala:546}
  /**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the maximum size in bytes for the total of all log data in 
a group
   * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
   *
   * @return A list of grouped segments
   */
  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: 
Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
    var grouped = List[List[LogSegment]]()
    var segs = segments.toList
    while(segs.nonEmpty) {
      var group = List(segs.head)
      var logSize = segs.head.size
      var indexSize = segs.head.index.sizeInBytes
      var timeIndexSize = segs.head.timeIndex.sizeInBytes
      segs = segs.tail
      while(segs.nonEmpty &&
            logSize + segs.head.size <= maxSize &&
            indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
            timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
            segs.head.index.lastOffset - group.last.index.baseOffset <= 
Int.MaxValue) {
        group = segs.head :: group
        logSize += segs.head.size
        indexSize += segs.head.index.sizeInBytes
        timeIndexSize += segs.head.timeIndex.sizeInBytes
        segs = segs.tail
      }
      grouped ::= group.reverse
    }
    grouped.reverse
  }
{code}
specifically:
{code:title=LogCleaner.scala:559}
            segs.head.index.lastOffset - group.last.index.baseOffset <= 
Int.MaxValue) {
{code}

> Recovering empty replica yields negative offsets in index of compact 
> partitions
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-4451
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4451
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Michael Schiff
>             Fix For: 0.9.0.1
>
>
> Bringing up an empty broker.  
> the partition for a compact topic is not split into multiple log files.  All 
> data is written into a single log file, causing offsets to overflow.
> A dump of the affected broker shortly after it started replicating:
> {code}
> michael.schiff@stats-kafka09:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/00000000000000000000.index | head -n 10
> Dumping /kafka/attainment_event-0/00000000000000000000.index
> offset: 1022071124 position: 1037612
> offset: -1713432120 position: 1348740
> offset: -886291423 position: 2397130
> offset: -644750126 position: 3445630
> offset: -57889876 position: 4493972
> offset: 433950099 position: 5388461
> offset: 1071769472 position: 6436837
> offset: 1746859069 position: 7485367
> offset: 2090359736 position: 8533822
> ...
> {code}
> and the dump of the same log file from the leader of this partition
> {code}
> michael.schiff@stats-kafka12:~$ sudo /opt/kafka/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments --files 
> /kafka/attainment_event-0/00000000000000000000.index
> [sudo] password for michael.schiff:
> Dumping /kafka/attainment_event-0/00000000000000000000.index
> offset: 353690666 position: 262054
> offset: 633140428 position: 523785
> offset: 756537951 position: 785815
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to