yjshen commented on issue #6161: Cloning subscriptions
URL: https://github.com/apache/pulsar/issues/6161#issuecomment-581229721
 
 
   If I understand it correctly, by cloning a subscription, you want to 
"bookmark" a reading position for a partition, and you could start from it 
later on? If this is the situation, I suggest you use the `Reader` API instead 
of `Consumer` in tasks and maintain read position for each commit internally in 
your app. 
   
   Here is what I've done in flink-pulsar-connector:
   1. an app-wide durable subscription for each partition, used to prevent 
Pulsar from deleting messages eagerly.
   2. For each partition you gonna read, use reader API to do message 
processing, by seeking to a position (either restored from a checkpoint or from 
an initial position), and report its reading position when a checkpoint is 
triggered by JobManager.
   3. When a checkpoint is done, reset the cursor of the durable subscription 
to the new position, notify Pulsar that the app has consume the batch of 
messages successfully and Pulsar is free to delete those messages.
   
   You could check https://github.com/streamnative/pulsar-flink to see if it 
fulfills your requirement to use flink over pulsar. A more detailed description 
of the connector could be found here: 
https://medium.com/streamnative/use-apache-pulsar-as-streaming-table-with-8-lines-of-code-39033a93947f
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to