ASF GitHub Bot commented on FLINK-6988:

Github user rangadi commented on the issue:

    May be an extra shuffle to make small batches could help. Another option is 
to buffer all the records in state and write them all inside commit(). But not 
sure how costly it is to save all the records in checkpointed state. 
    Another issue I see with using random txn id : if a worker looks 
unresponsive and work is moved to another worker, it is possible that the old 
worker still lingers around with open transaction. That would imply it the 
exactly-once consumers can not read past that transaction as long as it is open.
    I didn't know it was possible to resume a transaction since it was not part 
of producer API. This PR uses an undocumented way to do it.. do you know if 
Kafka Streams also does something like that? May be the producer will support 
`resumeTransaction()` properly in future.

> Add Apache Kafka 0.11 connector
> -------------------------------
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 

This message was sent by Atlassian JIRA

Reply via email to