[ 
https://issues.apache.org/jira/browse/FLINK-39996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18091680#comment-18091680
 ] 

Shekhar Prasad Rajak commented on FLINK-39996:
----------------------------------------------

I am looking into this. 

But we have to work on these items first 

https://issues.apache.org/jira/browse/KAFKA-20739

https://issues.apache.org/jira/browse/KAFKA-20738

> Migrate KafkaSink EOS recovery to Kafka public PreparedTxnState API
> -------------------------------------------------------------------
>
>                 Key: FLINK-39996
>                 URL: https://issues.apache.org/jira/browse/FLINK-39996
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-5.0.0
>            Reporter: Shekhar Prasad Rajak
>            Priority: Major
>             Fix For: kafka-5.1.0
>
>
> Flink KafkaSink exactly-once recovery currently persists transactionalId, 
> producerId, and epoch in KafkaCommittable, then uses 
> FlinkKafkaInternalProducer.resumeTransaction(...) to restore KafkaProducer
>   internals via reflection before calling commitTransaction().
>   This works today, but it depends on Kafka private internals and is fragile 
> across Kafka client changes. Kafka now has a public 2PC-oriented API shape:
>   PreparedTxnState state = producer.prepareTransaction();
>   and intended recovery flow:
>   producer.initTransactions(true);
>   producer.completeTransaction(new PreparedTxnState(serializedState));
>   Flink should eventually migrate KafkaSink EOS recovery to this public API 
> once Kafka client/broker supports keepPreparedTxn=true end-to-end.
>  
>  
> Current Behavior
>   - ExactlyOnceKafkaWriter.prepareCommit() creates KafkaCommittable from 
> producer id/epoch/transactional id.
>   - FlinkKafkaInternalProducer.precommitTransaction() only marks local Flink 
> wrapper state.
>   - KafkaCommitter resumes the transaction through 
> FlinkKafkaInternalProducer.resumeTransaction(...).
>   - resumeTransaction(...) mutates KafkaProducer internals via reflection.
>  
>  Expected Behavior
>   When the Kafka dependency supports public prepared transaction recovery, 
> Flink should persist:
>   transactionalId + PreparedTxnState
>   and recover/commit using public Kafka APIs:
>   producer.initTransactions(true);
>   producer.completeTransaction(new PreparedTxnState(serializedPreparedState));



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to