Jiabao-Sun opened a new pull request, #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1
## What is the purpose of the change FLIP-262 Flink MongoDB Connector https://cwiki.apache.org/confluence/display/FLINK/FLIP-262%3A+Introduce+MongoDB+connector ## Features - Support parallel read and write. - Support lookup table source. - Support scan table source. - Support push limit down. - Support push projection down. ## Documentation ### How to create a MongoDB table ```sql CREATE TABLE test_source ( `_id` STRING, `idx` INT, `code` STRING, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://user:password@127.0.0.1:27017', 'database' = 'test', 'collection' = 'test_source' ); CREATE TABLE test_sink ( `_id` STRING, `idx` INT, `code` STRING, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://user:password@127.0.0.1:27017', 'database' = 'test', 'collection' = 'test_sink' ); INSERT INTO test_sink SELECT * FROM test_source; ``` ### How to create MongoDB Source ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MongoSource<String> mongoSource = MongoSource.<String>builder() .setUri("mongodb://user:password@127.0.0.1:27017") .setDatabase("test") .setCollection("test_source") .setDeserializationSchema(new MongoJsonDeserializationSchema()) .build(); env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB Source") .setParallelism(4) .print() .setParallelism(1); env.execute("Print MongoDB records"); ``` ### How to create MongoDB Sink ```java MongoSink<Document> sink = MongoSink.<Document>builder() .setUri("mongodb://user:password@127.0.0.1:27017") .setDatabase("test") .setCollection("test_sink") .setSerializationSchema((doc, ctx) -> new InsertOneModel<>(doc.toBsonDocument())) .build(); ``` ### Connector Options Option | Required | Forwarded | Default | Type | Description -- | -- | -- | -- | -- | -- connector | required | no | (none) | String | Specify what connector to use, here should be 'mongodb'. uri | required | no | (none) | String | Specifies the connection uri of MongoDB. database | required | no | (none) | String | Specifies the database to read or write of MongoDB. collection | required | no | (none) | String | Specifies the collection to read or write of MongoDB. scan.fetch-size | optional | yes | 2048 | Integer | Gives the reader a hint as to the number of documents that should be fetched from the database per round-trip when reading. scan.cursor.batch-size | optional | yes | 0 | Integer | Specifies the number of documents to return in each batch of the response from the MongoDB instance. Set to 0 to use server's default. scan.cursor.no-timeout | optional | yes | true | Boolean | The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. scan.partition.strategy | optional | no | 'default' | String | Specifies the partition strategy. Available strategies are single, sample, split-vector, sharded and default. scan.partition.size | optional | no | 64mb | MemorySize | Specifies the partition memory size. scan.partition.samples | optional | no | 10 | Integer | Specifies the the samples count per partition. It only takes effect when the partition strategy is sample. sink.bulk-flush.max-actions | optional | yes | 1000 | Integer | Specifies the maximum number of buffered actions per bulk request. sink.bulk-flush.interval | optional | yes | 1s | Integer | Specifies the bulk flush interval. sink.delivery-guarantee | optional | no | 'at-least-once' | String | Optional delivery guarantee when committing. sink.max-retries | optional | yes | 3 | Integer | Specifies the max retry times if writing records to database failed. sink.parallelism | optional | no | (none) | Integer | Defines the parallelism of the MongoDB sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. lookup.max-retries | optional | yes | 3 | Integer | The max retry times if lookup database failed. lookup.partial-cache.max-rows | optional | yes | (none)| Integer | The maximum number of rows to store in the cache. lookup.partial-cache.cache-missing-key | optional | yes | (none)| Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. lookup.partial-cache.expire-after-access | optional | yes | (none)| Duration | Duration to expire an entry in the cache after accessing. lookup.partial-cache.expire-after-write | optional | yes | (none)| Duration | Duration to expire an entry in the cache after writing. ### DataType Mapping Bson Type | Flink SQL type -- | -- | BsonObjectId | STRING <br> CHAR <br>VARCHAR | | BsonBoolean | BOOLEAN | | BsonBinary | BINARY <br> VARBINARY | | BsonInt32| TINYINT <br> SMALLINT <br> INT | | BsonInt64 | BIGINT | | BsonDouble | FLOAT <br> DOUBLE | | Decimal128 | DECIMAL | | BsonDateTime | TIMESTAMP_LTZ(3) | | BsonTimestamp | TIMESTAMP_LTZ(0) | | BsonString | STRING | | BsonSymbol | STRING | | BsonRegularExpression | STRING | | BsonJavaScript | STRING | | BsonDbPointer | STRING <br> ROW<$ref STRING, $id STRING> | | BsonDocument | ROW | | BsonArray | ARRAY | | [GeoJson](https://www.mongodb.com/docs/manual/reference/geojson/) | Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> <br> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> <br>... | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org