Jiabao-Sun opened a new pull request, #37:
URL: https://github.com/apache/flink-connector-mongodb/pull/37

   As [Mongo 
Reference](https://www.mongodb.com/docs/manual/reference/method/db.collection.update/#upsert-on-a-sharded-collection)
 says:
   > To use db.collection.updateOne() on a sharded collection:
   >
   > - If you don't specify upsert: true, you must include an exact match on 
the _id field or target a single shard (such as by including the shard key in 
the filter).
   > - If you specify upsert: true, the filter must include the shard key.
   >
   > However, documents in a sharded collection can be missing the shard key 
fields. 
   > To target a document that is missing the shard key, you can use the null 
equality match 
   > in conjunction with another filter condition (such as on the _id field).
   
   When upsert into a sharded collection, the value of the shard key needs to 
be added to the filter. 
   For example:
   ```javascript
   db.collection.updateOne(
       {
           _id: ObjectId('<value>'),
           shardKey0: '<value>',
           shardKey1: '<value>'
       },
       { $set: { status: "D" }},
       { upsert: true }
   );
   ```
   
   In Flink SQL, when creating a sink table, the shard keys need to be declared 
using the `PARTITIONED BY` syntax. 
   The values for shard keys will be obtained from each individual record 
during runtime and added them into the filter.
   
   ```sql
   CREATE TABLE MySinkTable (
       _id       BIGINT,
       shardKey0 STRING,
       shardKey1 STRING,
       status    STRING,
       PRIMARY KEY (_id) NOT ENFORCED
   ) PARTITIONED BY (shardKey0, shardKey1) WITH (
       'connector' = 'mongodb',
       'uri' = 'mongodb://user:[email protected]:27017',
       'database' = 'my_db',
       'collection' = 'users'
   );
   
   -- Insert with dynamic partition
   INSERT INTO MySinkTable SELECT _id, shardKey0, shardKey1, status FROM T;
   
   -- Insert with static partition
   INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0', shardKey1 = 
'value1') SELECT 1, 'INIT';
   
   -- Insert with static(shardKey0) and dynamic(shardKey1) partition
   INSERT INTO MySinkTable PARTITION(shardKey0 = 'value0') SELECT 1, 'value1' 
'INIT';
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to