[
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22452:
-----------------------------------
Labels: stale-major (was: )
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> -----------------------------------------------------------------------
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.12.2
> Reporter: Wenhao Ji
> Priority: Major
> Labels: stale-major
>
> Currently, the "transactional.id"s of the Kafka producers in
> FlinkKafkaProducer are generated based on the task name. This mechanism has
> some limitations:
> * It will exceed Kafka's limitation if the task name is too long. (resolved
> in FLINK-17691)
> * They will very likely clash each other if the job topologies are similar.
> (discussed in FLINK-11654)
> * Only certain "transactional.id" may be authorized by [Prefixed
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
> on the target Kafka cluster.
> Besides, the spring community has introduced the
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
> method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink
> Kafka connector.
>
> As discussed in FLINK-11654, the possible solution will be,
> * either introduce an additional method called
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
> * or use the existing "transactional.id" properties as the prefix.
> And the behavior of the "transactional.id" generation will be
> * keep the behavior as it was if absent,
> * use the one if present as the prefix for the TransactionalIdsGenerator.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)