[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392957#comment-17392957
 ] 

Piotr Nowojski commented on FLINK-22452:
----------------------------------------

What about adding a list of additional things that could be configured at the 
beginning, after explaining the constructor? We could add something like:

Other notable configuration options when using {{FlinkKafkaProducer}} are:
Additionally you can also configure via setter methods the following parameters:
* selecting a custom transactional.id prefix (link to setTransactionalIdPrefix)
* log and ignore failures (setLogFailuresOnly)
* ... (setWriteTimestampToKafka)
* ... (ignoreFailuresAfterTransactionTimeout)

and to avoid duplicating the java docs, just add a hyperlink to the JavaDocs, 
like:
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.html#setLogFailuresOnly-boolean-

?

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> -----------------------------------------------------------------------
>
>                 Key: FLINK-22452
>                 URL: https://issues.apache.org/jira/browse/FLINK-22452
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.12.2
>            Reporter: Wenhao Ji
>            Assignee: Wenhao Ji
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to