[ 
https://issues.apache.org/jira/browse/KAFKA-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990086#comment-15990086
 ] 

Davor Poldrugo commented on KAFKA-5055:
---------------------------------------

Hi guys!
I think the problem is because of a bug in the method 
{{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}:

{code:java}
    public int addRecords(TopicPartition partition, 
Iterable<ConsumerRecord<byte[], byte[]>> records) {
        final int oldQueueSize = partitionGroup.numBuffered();
        final int newQueueSize = partitionGroup.addRawRecords(partition, 
records);

        log.trace("{} Added records into the buffered queue of partition {}, 
new queue size is {}", logPrefix, partition, newQueueSize);

        // if after adding these records, its partition queue's buffered size 
has been
        // increased beyond the threshold, we can then pause the consumption 
for this partition
        if (newQueueSize > this.maxBufferedSize) {
            consumer.pause(singleton(partition));
        }

        return newQueueSize - oldQueueSize;
    }
{code}

This line is the problem:
{{final int oldQueueSize = partitionGroup.numBuffered();}}

Instead of getting {{oldQueueSize}} for current TopicPartition it gets queue 
size for all the partitions, because {{partitionGroup.numBuffered()}} returns 
the queue size across all partitions.

Then {{return newQueueSize - oldQueueSize}} returns a negative value. Because 
it's more probable that the sum of all Queue sizes across partitions is bigger 
then the Queue size of this partition.

This goes back to 
{{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}}:
{code:java}
for (TopicPartition partition : records.partitions()) {
    StreamTask task = activeTasksByPartition.get(partition);
    numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, 
timerStartedMs);
{code}

There it gets summed into {{numAddedRecords}} - by actually decreasing the 
value (because it's negative), and this leads to wrong sensor value in 
{{records.count() - numAddedRecords}}, because {{numAddedRecords}} is now 
smaller because of a bug, not because of an invalid timestamp.

BugFix proposal:
In the method 
{{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}
change:
{{final int oldQueueSize = partitionGroup.numBuffered();}}
to
{{final int oldQueueSize = partitionGroup.numBuffered(partition);}}

If this is the cause of the bug, can I do a pull request?

> Kafka Streams skipped-records-rate sensor producing nonzero values even when 
> FailOnInvalidTimestamp is used as extractor
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5055
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5055
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Nikki Thean
>            Assignee: Guozhang Wang
>
> According to the code and the documentation for this metric, the only reason 
> for a skipped record is an invalid timestamp, except that a) I am reading 
> from a topic that is populated solely by Kafka Connect and b) I am using 
> `FailOnInvalidTimestamp` as the timestamp extractor.
> Either I'm missing something in the documentation (i.e. another reason for 
> skipped records) or there is a bug in the code that calculates this metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to