Jiabao-Sun opened a new pull request, #11:
URL: https://github.com/apache/flink-connector-mongodb/pull/11

   [Change 
streams](https://www.mongodb.com/docs/manual/changeStreams/#change-streams) 
allow applications to access real-time data changes without the complexity and 
risk of tailing the oplog. Applications can use change streams to subscribe to 
all data changes on a single collection, a database, or an entire deployment, 
and immediately react to them. Because change streams use the aggregation 
framework, applications can also filter for specific changes or transform the 
notifications at will.
   
   Change streams are available for [replica 
sets](https://www.mongodb.com/docs/manual/replication/) and [sharded 
clusters](https://www.mongodb.com/docs/manual/sharding/).
   
   We can use MongoDB change streams feature to support unbounded streaming 
read for mongodb connector.
   ```sql
   SET execution.checkpointing.interval = 3s;
   
   CREATE TABLE orders (
       `_id` STRING,
       `code` STRING,
       `quantity` BIGINT,
       PRIMARY KEY (_id) NOT ENFORCED
   ) WITH (
       'connector' = 'mongodb',
       'uri' = 'mongodb://mongodb:27017',
       'database' = 'test_unbounded',
       'collection' = 'orders',
       -- read collection's snapshot data and then continuously read changed 
data.
       'scan.startup.mode' = 'initial' 
   );
   
   SELETE * FROM orders;
   ```
   
   ### Startup Mode
   We can determine whether the source runs in bounded or unbounded mode by 
setting the `scan.startup.mode` configuration.
   
   - bounded: bounded read collection snapshot data and do not read changed 
data.
   - initial: read collection snapshot data and then continuously read changed 
data.
   - latest-offset: continuously read changed data from latest offset of oplog. 
   - timestamp: continuously read changed data from specified timestamp offset 
of oplog.
   
   ### Changelog Mode
   
   #### UPSERT
   Before mongodb version 6.0, pre and post images were not saved in the oplog.
   This means that we cannot directly obtain complete pre and post changed 
record to generate ALL mode changelog.
   By default, change streams only return the delta of fields during the update 
operation. However, we can configure the change stream to return the most 
current majority-committed version of the updated document to generate UPSERT 
mode changelog by [update 
lookup](https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations)
 feature.
   
   Before mongodb 6.0, we can set the 'change-stream.full-document.strategy' as 
'update-lookup' , which is also the default.
   ```properties
   'change-stream.full-document.strategy' = 'update-lookup' 
   ```
   
   However, update lookup will bring additional query time overhead. And the 
changelog in UPSERT mode will have an additional changelog normalize operator, 
which will continuously increase the state of the task. So we have to consider 
using an external state store to reduce memory pressure when using it, such as 
rockdb backend.
   
   ________
   
   #### ALL
   
   Starting in MongoDB 6.0, we can can use [change stream events pre and post 
images](https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre--and-post-images)
 feature  to output the version of a document before and after changes to 
generate ALL mode changelog.
   
   We can enable pre and post images of a collection by 
[create](https://www.mongodb.com/docs/manual/reference/command/create/#mongodb-dbcommand-dbcmd.create),
 or 
[collMod.](https://www.mongodb.com/docs/manual/reference/command/collMod/#mongodb-dbcommand-dbcmd.collMod)
 commands:
   ```javascript
   db.createCollection(
      "orders", {
       changeStreamPreAndPostImages: {
           enabled: true
       }
   });
   
   db.runCommand( {
       collMod: "orders",
       changeStreamPreAndPostImages: { enabled: true }
   } )
   ```
   
   Then we can set the 'change-stream.full-document.strategy' as 
'pre-and-post-images' to generate ALL mode changelog.
   ```properties
   'change-stream.full-document.strategy' = 'pre-and-post-images' 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to