Jiabao-Sun opened a new pull request, #1:
URL: https://github.com/apache/flink-connector-mongodb/pull/1

   ## What is the purpose of the change
   
   FLIP-262 Flink MongoDB Connector 
   
https://cwiki.apache.org/confluence/display/FLINK/FLIP-262%3A+Introduce+MongoDB+connector
   
   ## Features
   
     -  Support parallel read and write.
     -  Support lookup table source.
     -  Support scan table source.
     -  Support push limit down.
     -  Support push projection down.
   
   ## Documentation
   
   ### How to create a MongoDB table
   ```sql
   CREATE TABLE test_source (
     `_id` STRING,
     `idx` INT,
     `code` STRING,
     PRIMARY KEY (_id) NOT ENFORCED
   ) WITH (
     'connector' = 'mongodb',
     'uri' = 'mongodb://user:password@127.0.0.1:27017',
     'database' = 'test',
     'collection' = 'test_source'
   );
   
   CREATE TABLE test_sink (
     `_id` STRING,
     `idx` INT,
     `code` STRING,
     PRIMARY KEY (_id) NOT ENFORCED
   ) WITH (
     'connector' = 'mongodb',
     'uri' = 'mongodb://user:password@127.0.0.1:27017',
     'database' = 'test',
     'collection' = 'test_sink'
   );
   
   INSERT INTO test_sink SELECT * FROM test_source;
   ```
   
   ### How to create MongoDB Source
   ```java
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
     MongoSource<String> mongoSource =
            MongoSource.<String>builder()
                 .setUri("mongodb://user:password@127.0.0.1:27017")
                 .setDatabase("test")
                 .setCollection("test_source")
                 .setDeserializationSchema(new MongoJsonDeserializationSchema())
                 .build();
   
     env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB 
Source")
                 .setParallelism(4)
                 .print()
                 .setParallelism(1); 
   
     env.execute("Print MongoDB records");
   ```
   
   ### How to create MongoDB Sink
   ```java
     MongoSink<Document> sink =
            MongoSink.<Document>builder()
                 .setUri("mongodb://user:password@127.0.0.1:27017")
                 .setDatabase("test")
                 .setCollection("test_sink")
                 .setSerializationSchema((doc, ctx) -> new 
InsertOneModel<>(doc.toBsonDocument()))
                 .build();
   ```
   
   ### Connector Options
   
   
   Option | Required | Forwarded | Default | Type | Description
   -- | -- | -- | -- | -- | --
   connector | required |  no  | (none) | String | Specify what connector to 
use, here should be 'mongodb'.
   uri | required |  no  | (none) | String | Specifies the connection uri of 
MongoDB.
   database | required |  no  | (none) | String | Specifies the database to 
read or write of MongoDB.
   collection | required |  no  | (none) | String | Specifies the collection to 
read or write of MongoDB.
   scan.fetch-size | optional |  yes  |  2048 | Integer | Gives the reader a 
hint as to the number of documents that should be fetched from the database per 
round-trip when reading.
   scan.cursor.batch-size | optional |  yes  | 0 | Integer | Specifies the 
number of documents to return in each batch of the response from the MongoDB 
instance. Set to 0 to use server's default.
   scan.cursor.no-timeout | optional |  yes  | true | Boolean | The server 
normally times out idle cursors after an inactivity period (10 minutes) to 
prevent excess memory use. Set this option to true to prevent that.
   scan.partition.strategy | optional |  no  | 'default' | String | Specifies 
the partition strategy. Available strategies are single, sample, split-vector, 
sharded and default.
   scan.partition.size | optional |  no  | 64mb | MemorySize | Specifies the 
partition memory size.
   scan.partition.samples | optional |  no  | 10 | Integer | Specifies the the 
samples count per partition. It only takes effect when the partition strategy 
is sample.
   sink.bulk-flush.max-actions | optional |  yes  | 1000 | Integer | Specifies 
the maximum number of buffered actions per bulk request.
   sink.bulk-flush.interval | optional |  yes  | 1s | Integer | Specifies the 
bulk flush interval.
   sink.delivery-guarantee | optional |  no  | 'at-least-once' | String | 
Optional delivery guarantee when committing.
   sink.max-retries | optional |  yes  | 3 | Integer | Specifies the max retry 
times if writing records to database failed.
   sink.parallelism | optional |  no  | (none) | Integer |  Defines the 
parallelism of the MongoDB sink operator. By default, the parallelism is 
determined by the framework using the same parallelism of the upstream chained 
operator.
   lookup.max-retries | optional | yes   | 3 |  Integer | The max retry times 
if lookup database failed.
   lookup.partial-cache.max-rows | optional | yes  | (none)| Integer | The 
maximum number of rows to store in the cache.
   lookup.partial-cache.cache-missing-key | optional | yes  | (none)| Boolean | 
Whether to store an empty value into the cache if the lookup key doesn't match 
any rows in the table.
   lookup.partial-cache.expire-after-access | optional | yes  | (none)| 
Duration | Duration to expire an entry in the cache after accessing.
   lookup.partial-cache.expire-after-write | optional | yes  | (none)| Duration 
| Duration to expire an entry in the cache after writing.
   
   
   ### DataType Mapping
   Bson Type | Flink SQL type 
   -- | -- 
   | BsonObjectId | STRING <br> CHAR <br>VARCHAR  |
   | BsonBoolean | BOOLEAN |
   | BsonBinary | BINARY <br> VARBINARY |
   | BsonInt32| TINYINT <br> SMALLINT <br> INT |
   | BsonInt64 | BIGINT |
   | BsonDouble | FLOAT <br> DOUBLE |
   | Decimal128 | DECIMAL |
   | BsonDateTime | TIMESTAMP_LTZ(3) |
   | BsonTimestamp | TIMESTAMP_LTZ(0) |
   | BsonString | STRING |
   | BsonSymbol | STRING |
   | BsonRegularExpression | STRING |
   | BsonJavaScript | STRING |
   | BsonDbPointer  | STRING <br> ROW<$ref STRING, $id STRING> |
   | BsonDocument | ROW |
   | BsonArray | ARRAY |
   | [GeoJson](https://www.mongodb.com/docs/manual/reference/geojson/) | Point 
: ROW<type STRING, coordinates ARRAY<DOUBLE>> <br> Line : ROW<type STRING, 
coordinates ARRAY<ARRAY< DOUBLE>>> <br>... |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to