[
https://issues.apache.org/jira/browse/FLINK-35283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-35283:
-----------------------------------
Labels: pull-request-available (was: )
> Add support unique Kafka producer client ids
> ---------------------------------------------
>
> Key: FLINK-35283
> URL: https://issues.apache.org/jira/browse/FLINK-35283
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Francis
> Priority: Not a Priority
> Labels: pull-request-available
>
> This issue came out of debuging a warning we're seeing in our Flink logs.
> We're running Flink 1.18 and have an application that uses Kafka topics as a
> source and a sink. We're running with several tasks. The warning we're seeing
> in the logs is:
> {{WARN org.apache.kafka.common.utils.AppInfoParser - Error registering
> AppInfo mbean}}
> {{javax.management.InstanceAlreadyExistsException:
> kafka.producer:type=app-info,id=kafka producer client id}}
> I've spent a bit of time debugging, and it looks like the root cause of this
> warning is the Flink {{KafkaSink}} creating multiple {{{}KafkaWriter{}}}s
> that, in turn, create multiple {{{}KafkaProducer{}}}s with the same Kafka
> producer `{{{}client.id{}}}`. Since the value for {{client.id}} is used when
> registering the {{AppInfo}} MBean — when multiple {{{}KafkaProducer{}}}s with
> the same {{client.id}} are registered we get the above
> {{{}InstanceAlreadyExistsException{}}}. Since we're running with several
> tasks and we get a Kafka producer per task this duplicate registration
> exception makes sense to me.
> I'm wondering if the fix would be to update the {{KafkaSink.builder}} by
> adding a {{setClientIdPrefix}} method, similar to what we have already on the
> {{{}KafkaSource.builder{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)