Hi all,
        we found our consumer have high cpu load in our product enviroment,as 
we know,fetch.min.bytes and fetch.wait.ma <http://fetch.wait.ma/>x.ms will 
affect the frequency of consumer’s return,
so we adjust them to very big so that broker is very hard to satisfy it.
        then we found the problem is not be solved,then we check the kafka’s 
code,we check delayedFetch’s tryComplete() function has these codes,

         if (endOffset.messageOffset != fetchOffset.messageOffset) {
              if (endOffset.onOlderSegment(fetchOffset)) {
                // Case C, this can happen when the new fetch operation is on a 
truncated leader
                debug("Satisfying fetch %s since it is fetching later segments 
of partition %s.".format(fetchMetadata, topicAndPartition))
                return forceComplete()
              } else if (fetchOffset.onOlderSegment(endOffset)) {
                // Case C, this can happen when the fetch operation is falling 
behind the current segment
                // or the partition has just rolled a new segment
                debug("Satisfying fetch %s immediately since it is fetching 
older segments.".format(fetchMetadata))
                return forceComplete()
              } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                // we need take the partition fetch size as upper bound when 
accumulating the bytes
                accumulatedSize += 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
              }
            }

so we can ensure that our fetchOffset’s segmentBaseOffset is not the same as 
endOffset’s segmentBaseOffset,then we check our topic-partition’s segment, we 
found the data in the segment is all cleaned by the kafka for log.retention.
and we guess that the  fetchOffset’s segmentBaseOffset is smaller than 
endOffset’s segmentBaseOffset leads this problem.

but my point is should we use we use these code to make client use less cpu,
   if (endOffset.messageOffset != fetchOffset.messageOffset) {
              if (endOffset.onOlderSegment(fetchOffset)) {
                return false
              } else if (fetchOffset.onOlderSegment(endOffset)) {
                return false
              }
    }

and then it will response after fetch.wait.ma <http://fetch.wait.ma/>x.ms in 
this scene instead of immediately return.

Feedback is greatly appreciated. Thanks.

        

Reply via email to