Jiabao-Sun opened a new pull request, #37: URL: https://github.com/apache/flink-connector-mongodb/pull/37
As [Mongo Reference](https://www.mongodb.com/docs/manual/reference/method/db.collection.update/#upsert-on-a-sharded-collection) says: > To use db.collection.updateOne() on a sharded collection: > > - If you don't specify upsert: true, you must include an exact match on the _id field or target a single shard (such as by including the shard key in the filter). > - If you specify upsert: true, the filter must include the shard key. > > However, documents in a sharded collection can be missing the shard key fields. > To target a document that is missing the shard key, you can use the null equality match > in conjunction with another filter condition (such as on the _id field). When upsert into a sharded collection, the value of the shard key needs to be added to the filter. For example: ```javascript db.collection.updateOne( { _id: ObjectId('<value>'), shardKey0: '<value>', shardKey1: '<value>' }, { $set: { status: "D" }}, { upsert: true } ); ``` In Flink SQL, when creating a sink table, the shard keys need to be declared using the `PARTITIONED BY` syntax. The values for shard keys will be obtained from each individual record during runtime and added them into the filter. ```sql CREATE TABLE MySinkTable ( _id BIGINT, shardKey0 STRING, shardKey1 STRING, status STRING, PRIMARY KEY (_id) NOT ENFORCED ) PARTITIONED BY (shardKey0, shardKey1) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://user:[email protected]:27017', 'database' = 'my_db', 'collection' = 'users' ); -- Insert with dynamic partition INSERT INTO MySinkTable SELECT _id, shardKey0, shardKey1, status FROM T; -- Insert with static partition INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0', shardKey1 = 'value1') SELECT 1, 'INIT'; -- Insert with static(shardKey0) and dynamic(shardKey1) partition INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1' 'INIT'; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
