[ 
https://issues.apache.org/jira/browse/FLINK-35283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35283:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add support unique Kafka producer client ids 
> ---------------------------------------------
>
>                 Key: FLINK-35283
>                 URL: https://issues.apache.org/jira/browse/FLINK-35283
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Francis
>            Priority: Not a Priority
>              Labels: pull-request-available
>
> This issue came out of debuging a warning we're seeing in our Flink logs. 
> We're running Flink 1.18 and have an application that uses Kafka topics as a 
> source and a sink. We're running with several tasks. The warning we're seeing 
> in the logs is:
> {{WARN org.apache.kafka.common.utils.AppInfoParser - Error registering 
> AppInfo mbean}}
> {{javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=app-info,id=kafka producer client id}}
> I've spent a bit of time debugging, and it looks like the root cause of this 
> warning is the Flink {{KafkaSink}} creating multiple {{{}KafkaWriter{}}}s 
> that, in turn, create multiple {{{}KafkaProducer{}}}s with the same Kafka 
> producer `{{{}client.id{}}}`. Since the value for {{client.id}} is used when 
> registering the {{AppInfo}} MBean — when multiple {{{}KafkaProducer{}}}s with 
> the same {{client.id}} are registered we get the above 
> {{{}InstanceAlreadyExistsException{}}}. Since we're running with several 
> tasks and we get a Kafka producer per task this duplicate registration 
> exception makes sense to me.
> I'm wondering if the fix would be to update the {{KafkaSink.builder}} by 
> adding a {{setClientIdPrefix}} method, similar to what we have already on the 
> {{{}KafkaSource.builder{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to