[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16059699#comment-16059699
 ] 

Piotr Nowojski commented on FLINK-6988:
---------------------------------------

Unfortunately KafkaProducer's API is very limited. Especially it doesn't allow 
to implement two phase commit protocol like it is done in BucketingSink, 
because it doesn't allow for neither resuming nor committing transactions from 
different workers after crash (last bullet point above). This is because every 
time user calls `Producer::initTransactions()`, all pending (not committed) 
transactions are being automatically aborted by Kafka Server. Calling 
`Producer::initTransactions()` is neccessary to obtain `producerId` and `epoch` 
values from the Kafka server, which are crucial for manipulating transactions.

Fortunately there is a walk around this issue. It seems like Kafka's REST API 
is more flexible and we should be possible to resume transactions. Every time 
we begin transaction we can store `producerId` and `epoch` on the state. In 
case we need to commit pending transaction on another worker (after crash), 
instead of calling `KafkaProducer::initTransactions()` we can restore 
`producerId` and `epoch` from the state and commit this pending transaction 
using those restored values.

"Hacky" part is that  `producerId` and `epoch` values are hidden behind private 
fields in package private classes. That means we can not overload 
`KafkaProducer` to obtain or set them. That leaves as with two options. We 
either reimplement KafkaProducer using Kafka's REST API (we could copy/paste 
most of their code) or we use JVM reflection to manually manipulate official 
KafkaProducer class.

> Add Apache Kafka 0.11 connector
> -------------------------------
>
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to output topic using that transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to