Jiabao-Sun opened a new pull request, #11: URL: https://github.com/apache/flink-connector-mongodb/pull/11
[Change streams](https://www.mongodb.com/docs/manual/changeStreams/#change-streams) allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will. Change streams are available for [replica sets](https://www.mongodb.com/docs/manual/replication/) and [sharded clusters](https://www.mongodb.com/docs/manual/sharding/). We can use MongoDB change streams feature to support unbounded streaming read for mongodb connector. ```sql SET execution.checkpointing.interval = 3s; CREATE TABLE orders ( `_id` STRING, `code` STRING, `quantity` BIGINT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://mongodb:27017', 'database' = 'test_unbounded', 'collection' = 'orders', -- read collection's snapshot data and then continuously read changed data. 'scan.startup.mode' = 'initial' ); SELETE * FROM orders; ``` ### Startup Mode We can determine whether the source runs in bounded or unbounded mode by setting the `scan.startup.mode` configuration. - bounded: bounded read collection snapshot data and do not read changed data. - initial: read collection snapshot data and then continuously read changed data. - latest-offset: continuously read changed data from latest offset of oplog. - timestamp: continuously read changed data from specified timestamp offset of oplog. ### Changelog Mode #### UPSERT Before mongodb version 6.0, pre and post images were not saved in the oplog. This means that we cannot directly obtain complete pre and post changed record to generate ALL mode changelog. By default, change streams only return the delta of fields during the update operation. However, we can configure the change stream to return the most current majority-committed version of the updated document to generate UPSERT mode changelog by [update lookup](https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations) feature. Before mongodb 6.0, we can set the 'change-stream.full-document.strategy' as 'update-lookup' , which is also the default. ```properties 'change-stream.full-document.strategy' = 'update-lookup' ``` However, update lookup will bring additional query time overhead. And the changelog in UPSERT mode will have an additional changelog normalize operator, which will continuously increase the state of the task. So we have to consider using an external state store to reduce memory pressure when using it, such as rockdb backend. ________ #### ALL Starting in MongoDB 6.0, we can can use [change stream events pre and post images](https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre--and-post-images) feature to output the version of a document before and after changes to generate ALL mode changelog. We can enable pre and post images of a collection by [create](https://www.mongodb.com/docs/manual/reference/command/create/#mongodb-dbcommand-dbcmd.create), or [collMod.](https://www.mongodb.com/docs/manual/reference/command/collMod/#mongodb-dbcommand-dbcmd.collMod) commands: ```javascript db.createCollection( "orders", { changeStreamPreAndPostImages: { enabled: true } }); db.runCommand( { collMod: "orders", changeStreamPreAndPostImages: { enabled: true } } ) ``` Then we can set the 'change-stream.full-document.strategy' as 'pre-and-post-images' to generate ALL mode changelog. ```properties 'change-stream.full-document.strategy' = 'pre-and-post-images' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
