imaffe edited a comment on pull request #17452: URL: https://github.com/apache/flink/pull/17452#issuecomment-1033486400
> > The only concern is my limited knowledge of Pulsar transactions. Maybe you can clarify the following scenarios. > > The job opens a transaction and sends records, now suddenly fails before the first checkpoint. The pipeline would restart and try to use the same transaction again. Do we generate duplicates in this case because the records are replayed? > > The job did a successful checkpoint and the transaction is moved to the committer. Before the committer can commit we write new records in the same transaction (I understood that we reuse the transaction for a topic). Does the committer now also commit the records that were received after the checkpoint? > > The questions you ask couldn't happen on the Pulsar sink. Allow me to explain them in detail. > > 1. PulsarWriter only implements `PrecommittingSinkWriter` which doesn't have any writer state. If the pipeline restarts without the first checkpoint. All the transactions would be omitted and we would create new transactions instead. The previous transactions are kept on Pulsar until they meet the timeout. The transaction is just a changelog-based system. Creating a lot of unused transactions is accepted. > 2. If the transactions are passed to Committer after successfully checkpoint on PulsarWriter. They can't be touched by PulsarWriter anymore. PulsarWriter clears its internal transaction register when calling `PulsarWriter#prepareCommit`. EDIT: I took another look, my original understanding was wrong. Seems commit is not tight to checkpoints, it's controlled by the Flink I have another question regarding the latency. If the transactions are committed on checkpoints only, let's say the checkpoint interval is 1min. And let's say the data volume is small, does that mean pulsar consumer won't be able to consume data produced by Pulsar Sink until the 1 minute interval passes (so the data sent within the 1 min is visible to consumers) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
