[ 
https://issues.apache.org/jira/browse/FLINK-28790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574667#comment-17574667
 ] 

Valentina Predtechenskaya commented on FLINK-28790:
---------------------------------------------------

Is there any workarounds to prevent missing producer metrics ? I can use my own 
patched version of flink-kafka-connector, but I actually don't want :)

> Incorrect KafkaProducer metrics initialization
> ----------------------------------------------
>
>                 Key: FLINK-28790
>                 URL: https://issues.apache.org/jira/browse/FLINK-28790
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.4, 1.15.1
>            Reporter: Valentina Predtechenskaya
>            Priority: Major
>
> Problem
> KafkaProducer Flink metrics have unpredictable behavior because of concurrent 
> initialization of broker's and topic's metrics.
> Reproducing
> Firstly we found the problem with our Flink cluster: metric 
> KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
> near zero) on several subtasks, in the same time other subtasks was fine with 
> this metric. Actual outgoing rate was the same on different subtasks, it was 
> clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
> on every subtask, problem 100% was with metric itself.
> After long investigation we found the root-cause of this behavior:
>  
> * KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
> [initializes|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330]
>  metric wrappers over existing KafkaProducer metrics (gauges)
> * KafkaProducer itself in the constructor [creates 
> Sender|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460]
>  to access brokers, starts a thread (kafka-producer-network-thread) and run 
> Sender in this separate thread
> * After starting the Sender, metrics connected with topics and brokers 
> register for some time. If they register quickly, KafkaWriter will see them 
> before the end of initialization and these metrics will be wrapped as flink 
> gauges. Otherwise, they will not.
> * [Some KafkaProducer 
> metrics|https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics]
>  from producer and from broker has same names - for example, 
> outgoing-byte-rate
> * In case if two metrics has same name, Flink KafkaWriter 
> [rewrites|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360]
>  metric in wrapper
> So, to reproduce this bug it's enough to run any job with Kafka Sink and to 
> look at the KafkaProducer metrics, some of them will be absent (broker's or 
> topic's) or some of them will be rewritten (like outgoing-byte-rate in the 
> example).
> I suppose there is at least two ways to fix it:
> 1. Add tag (producer-metric, producer-node-metric, etc.) to Flinks metrics 
> name
> 2. Use only metrics with tag=producer-metrics, ignore any another tags - 
> without considering broker's and topic's metrics
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to