syhily commented on pull request #17452:
URL: https://github.com/apache/flink/pull/17452#issuecomment-1030376275


   > The only concern is my limited knowledge of Pulsar transactions. Maybe you 
can clarify the following scenarios.
   >
   > The job opens a transaction and sends records, now suddenly fails before 
the first checkpoint. The pipeline would restart and try to use the same 
transaction again. Do we generate duplicates in this case because the records 
are replayed?
   > The job did a successful checkpoint and the transaction is moved to the 
[committer](). Before the committer can commit we write new records in the same 
transaction (I understood that we reuse the transaction for a topic). Does the 
committer now also commit the records that were received after the checkpoint?
   
   The questions you ask couldn't happen on the Pulsar sink. Allow me to 
explain them in detail.
   
   1. PulsarWriter only implements `PrecommittingSinkWriter` which doesn't have 
any writer state. If the pipeline restarts without the first checkpoint. All 
the transactions would be omitted and we would create new transactions instead. 
The previous transactions are kept on Pulsar until they meet the timeout. The 
transaction is just a changelog-based system. Creating a lot of unused 
transactions is accepted.
   
   2. If the transactions are passed to Committer after successfully checkpoint 
on PulsarWriter. It can't be touched by PulsarWriter anymore. PulsarWriter 
clears its internal transaction register when calling 
`PulsarWriter#prepareCommit`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to