[ 
https://issues.apache.org/jira/browse/FLINK-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17027693#comment-17027693
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8410:
--------------------------------------------

[~trohrmann] The problem still exists. I've updated the link in the description 
to reflect changes in the `Kafka09Fetcher`.
I've moved down the priority of this ticket, as it's not a critical issue.
Unfortunately I have other more important tasks at hand, so would not be able 
to fix this in the foreseeable future. Will remove myself from assignee.

For others who are interested in picking this up:
the main change is to not let {{AbstractFetcher#OffsetGauge}} fetch the 
committed offsets from the offset state, since there is a lag between the 
offsets being updated in Flink state and offsets actually being committed back 
to Kafka.

Instead, {{AbstractFetcher#OffsetGauge}} should be integrated with the 
implementation of {{KafkaCommitCallback}}, so that the gauge value is only ever 
updated only when {{FlinkKafkaConsumerBase#doCommitInternalOffsetsToKafka}} 
completes.

> Kafka consumer's commitedOffsets gauge metric is prematurely set
> ----------------------------------------------------------------
>
>                 Key: FLINK-8410
>                 URL: https://issues.apache.org/jira/browse/FLINK-8410
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Metrics
>    Affects Versions: 1.3.2, 1.4.0, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>
> The {{committedOffset}} metric gauge value is set too early. It is set here:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L236
> This sets the committed offset before the actual commit happens, which varies 
> depending on whether the commit mode is auto periodically, or committed on 
> checkpoints. Moreover, in the 0.9+ consumers, the {{KafkaConsumerThread}} may 
> choose to supersede some commit attempts if the commit takes longer than the 
> commit interval.
> While the committed offset back to Kafka is not a critical value used by the 
> consumer, it will be best to have more accurate values as a Flink-shipped 
> metric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to