[ https://issues.apache.org/jira/browse/KAFKA-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990086#comment-15990086 ]
Davor Poldrugo commented on KAFKA-5055: --------------------------------------- Hi guys! I think the problem is because of a bug in the method {{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}: {code:java} public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) { final int oldQueueSize = partitionGroup.numBuffered(); final int newQueueSize = partitionGroup.addRawRecords(partition, records); log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize); // if after adding these records, its partition queue's buffered size has been // increased beyond the threshold, we can then pause the consumption for this partition if (newQueueSize > this.maxBufferedSize) { consumer.pause(singleton(partition)); } return newQueueSize - oldQueueSize; } {code} This line is the problem: {{final int oldQueueSize = partitionGroup.numBuffered();}} Instead of getting {{oldQueueSize}} for current TopicPartition it gets queue size for all the partitions, because {{partitionGroup.numBuffered()}} returns the queue size across all partitions. Then {{return newQueueSize - oldQueueSize}} returns a negative value. Because it's more probable that the sum of all Queue sizes across partitions is bigger then the Queue size of this partition. This goes back to {{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}}: {code:java} for (TopicPartition partition : records.partitions()) { StreamTask task = activeTasksByPartition.get(partition); numAddedRecords += task.addRecords(partition, records.records(partition)); } streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); {code} There it gets summed into {{numAddedRecords}} - by actually decreasing the value (because it's negative), and this leads to wrong sensor value in {{records.count() - numAddedRecords}}, because {{numAddedRecords}} is now smaller because of a bug, not because of an invalid timestamp. BugFix proposal: In the method {{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}} change: {{final int oldQueueSize = partitionGroup.numBuffered();}} to {{final int oldQueueSize = partitionGroup.numBuffered(partition);}} If this is the cause of the bug, can I do a pull request? > Kafka Streams skipped-records-rate sensor producing nonzero values even when > FailOnInvalidTimestamp is used as extractor > ------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-5055 > URL: https://issues.apache.org/jira/browse/KAFKA-5055 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Nikki Thean > Assignee: Guozhang Wang > > According to the code and the documentation for this metric, the only reason > for a skipped record is an invalid timestamp, except that a) I am reading > from a topic that is populated solely by Kafka Connect and b) I am using > `FailOnInvalidTimestamp` as the timestamp extractor. > Either I'm missing something in the documentation (i.e. another reason for > skipped records) or there is a bug in the code that calculates this metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)