[ 
https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16802858#comment-16802858
 ] 

Shuyi Chen commented on FLINK-11912:
------------------------------------

Hi [~becket_qin], thanks a lot for your comments. The use of 
manualRegisteredMetricSet is not to remove the MetricGroup, but to prevent 
registering the same MetricGroup multiple times. The reason is that, the per 
partition metrics are only made available in KafkaConsumer after it has been 
assigned the partitions and start polling. so we can only register those 
metrics in the consumer polling run loop, however, since it's a loop, we need 
to prevent the same metric being registerred again and again in the loop. 
Therefore, we add new entries to manualRegisteredMetricSet  when new partitions 
are discovered to signal new metrics should be available soon for registration, 
and when successful registered, we remove those entries so that it wont 
reregister again in the loop to prevent hogging the consumer thread. Let me 
know otherwise. 

> Expose per partition Kafka lag metric in Flink Kafka connector
> --------------------------------------------------------------
>
>                 Key: FLINK-11912
>                 URL: https://issues.apache.org/jira/browse/FLINK-11912
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4, 1.7.2
>            Reporter: Shuyi Chen
>            Assignee: Shuyi Chen
>            Priority: Major
>
> In production, it's important that we expose the Kafka lag by partition 
> metric in order for users to diagnose which Kafka partition is lagging. 
> However, although the Kafka lag by partition metrics are available in 
> KafkaConsumer after 0.10.2,  Flink was not able to properly register it 
> because the metrics are only available after the consumer start polling data 
> from partitions. I would suggest the following fix:
> 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet.
> 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add 
> MetricName for those partitions that we want to register into 
> manualRegisteredMetricSet. 
> 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, 
> try to search for the metrics available in KafkaConsumer, and if found, 
> register it and remove the entry from manualRegisteredMetricSet. 
> The overhead of the above approach is bounded and only incur when discovering 
> new partitions, and registration is done once the KafkaConsumer have the 
> metrics exposed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to