aitozi commented on FLINK-6109:

[~tzulitai] you mentioned that the calculate of the lag is overwhelming, but we 
can fetch the kafka lag after kafka version 0.10.2 and dont need to calculate , 
as i mentioned in https://github.com/apache/flink/pull/4935

> Add "consumer lag" report metric to FlinkKafkaConsumer
> ------------------------------------------------------
>                 Key: FLINK-6109
>                 URL: https://issues.apache.org/jira/browse/FLINK-6109
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: aitozi
>            Priority: Major
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).

This message was sent by Atlassian JIRA

Reply via email to