hailin0 commented on code in PR #4620:
URL: 
https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1188081012


##########
docs/en/connector-v2/sink/MongoDB.md:
##########
@@ -2,52 +2,223 @@
 
 > MongoDB sink connector
 
-## Description
+The MongoDB Connector provides the ability to read and write data from and to 
MongoDB.
+This document describes how to set up the MongoDB connector to run data 
writers against MongoDB.
 
-Write data to `MongoDB`
+Support those engines
+---------------------
 
-## Key features
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+Key featuresl
+-------------
 
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+Dependencies
+------------
+
+In order to use the Mongodb connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central 
repository.
+
+| MongoDB version |                                                  
dependency                                                   |
+|-----------------|---------------------------------------------------------------------------------------------------------------|
+| universal       | 
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb)
 |
+
+Data Type Mapping
+-----------------
+
+The following table lists the field data type mapping from MongoDB BSON type 
to Seatunnel data type.
+
+| Seatunnel type | MongoDB BSON type |
+|----------------|-------------------|
+| STRING         | ObjectId          |
+| STRING         | String            |
+| BOOLEAN        | Boolean           |
+| BINARY         | Binary            |
+| INTEGER        | Int32             |
+| TINYINT        | Int32             |
+| SMALLINT       | Int32             |
+| BIGINT         | Int64             |
+| DOUBLE         | Double            |
+| FLOAT          | Double            |
+| DECIMAL        | Decimal128        |
+| Date           | Date              |
+| Timestamp      | Timestamp[Date]   |
+| ROW            | Object            |
+| ARRAY          | Array             |
+
+Tips:
+1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will 
produce a Date data type in MongoDB, but the precision will be different. The 
data generated by the SeaTunnel Date type has second-level precision, while the 
data generated by the SeaTunnel Timestamp type has millisecond-level precision.
+
+2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range 
cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+Connector Options
+-----------------
+
+|        Option         | Required | Default |   Type   |                      
                            Description                                         
          |
+|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
+| uri                   | required | (none)  | String   | The MongoDB 
connection uri.                                                                 
                   |
+| database              | required | (none)  | String   | The name of MongoDB 
database to read or write.                                                      
           |
+| collection            | required | (none)  | String   | The name of MongoDB 
collection to read or write.                                                    
           |
+| schema                | required | (none)  | String   | MongoDB's BSON and 
seatunnel data structure mapping                                                
            |
+| buffer-flush.max-rows | optional | 1000    | String   | Specifies the 
maximum number of buffered rows per batch request.                              
                 |
+| buffer-flush.interval | optional | 30000   | String   | Specifies the retry 
time interval if writing records to database failed, the unit is seconds.       
           |
+| retry.max             | optional | default | String   | Specifies the max 
retry times if writing records to database failed.                              
             |
+| retry.interval        | optional | 1000    | Duration | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
           |
+| upsert-enable         | optional | false   | Boolean  | Whether to write 
documents via upsert mode.                                                      
              |
+| upsert-key            | optional | (none)  | List     | The primary keys for 
upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for 
properties. |
+
+How to create a MongoDB Data synchronization jobs
+-------------------------------------------------
+
+The following example demonstrates how to create a data synchronization job 
that writes randomly generated data to a MongoDB database:
 
-## Options
+```bash
+# Set the basic configuration of the task to be performed
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| uri            | string | yes      | -             |
-| database       | string | yes      | -             |
-| collection     | string | yes      | -             |
-| common-options | config | no       | -             |
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
 
-### uri [string]
+sink {
+  MongoDB{
+    uri = mongodb://user:[email protected]:27017
+    database = "test"
+    collection = "test"
+    schema = {
+      fields {
+        _id = string
+        c_bigint = bigint
+        }

Review Comment:
   ```suggestion
         }
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/config/MongodbWriterOptions.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class MongodbWriterOptions implements Serializable {

Review Comment:
   move to `org.apache.seatunnel.connectors.seatunnel.mongodb.sink`



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java:
##########
@@ -18,24 +18,37 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, 
COLLECTION).build();
+        return OptionRule.builder()
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   duplicate config `COLLECTION` ?



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/writer/MongodbWriter.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config.MongodbWriterOptions;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

Review Comment:
   move to `org.apache.seatunnel.connectors.seatunnel.mongodb.sink`



##########
docs/en/connector-v2/sink/MongoDB.md:
##########
@@ -2,52 +2,223 @@
 
 > MongoDB sink connector
 
-## Description
+The MongoDB Connector provides the ability to read and write data from and to 
MongoDB.
+This document describes how to set up the MongoDB connector to run data 
writers against MongoDB.
 
-Write data to `MongoDB`
+Support those engines
+---------------------
 
-## Key features
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+Key featuresl
+-------------
 
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+Dependencies
+------------
+
+In order to use the Mongodb connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central 
repository.
+
+| MongoDB version |                                                  
dependency                                                   |
+|-----------------|---------------------------------------------------------------------------------------------------------------|
+| universal       | 
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb)
 |
+
+Data Type Mapping
+-----------------
+
+The following table lists the field data type mapping from MongoDB BSON type 
to Seatunnel data type.
+
+| Seatunnel type | MongoDB BSON type |
+|----------------|-------------------|
+| STRING         | ObjectId          |
+| STRING         | String            |
+| BOOLEAN        | Boolean           |
+| BINARY         | Binary            |
+| INTEGER        | Int32             |
+| TINYINT        | Int32             |
+| SMALLINT       | Int32             |
+| BIGINT         | Int64             |
+| DOUBLE         | Double            |
+| FLOAT          | Double            |
+| DECIMAL        | Decimal128        |
+| Date           | Date              |
+| Timestamp      | Timestamp[Date]   |
+| ROW            | Object            |
+| ARRAY          | Array             |
+
+Tips:
+1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will 
produce a Date data type in MongoDB, but the precision will be different. The 
data generated by the SeaTunnel Date type has second-level precision, while the 
data generated by the SeaTunnel Timestamp type has millisecond-level precision.
+
+2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range 
cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+Connector Options
+-----------------
+
+|        Option         | Required | Default |   Type   |                      
                            Description                                         
          |
+|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
+| uri                   | required | (none)  | String   | The MongoDB 
connection uri.                                                                 
                   |
+| database              | required | (none)  | String   | The name of MongoDB 
database to read or write.                                                      
           |
+| collection            | required | (none)  | String   | The name of MongoDB 
collection to read or write.                                                    
           |
+| schema                | required | (none)  | String   | MongoDB's BSON and 
seatunnel data structure mapping                                                
            |
+| buffer-flush.max-rows | optional | 1000    | String   | Specifies the 
maximum number of buffered rows per batch request.                              
                 |
+| buffer-flush.interval | optional | 30000   | String   | Specifies the retry 
time interval if writing records to database failed, the unit is seconds.       
           |
+| retry.max             | optional | default | String   | Specifies the max 
retry times if writing records to database failed.                              
             |
+| retry.interval        | optional | 1000    | Duration | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
           |
+| upsert-enable         | optional | false   | Boolean  | Whether to write 
documents via upsert mode.                                                      
              |
+| upsert-key            | optional | (none)  | List     | The primary keys for 
upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for 
properties. |
+
+How to create a MongoDB Data synchronization jobs
+-------------------------------------------------
+
+The following example demonstrates how to create a data synchronization job 
that writes randomly generated data to a MongoDB database:
 
-## Options
+```bash
+# Set the basic configuration of the task to be performed
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| uri            | string | yes      | -             |
-| database       | string | yes      | -             |
-| collection     | string | yes      | -             |
-| common-options | config | no       | -             |
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
 
-### uri [string]
+sink {
+  MongoDB{
+    uri = mongodb://user:[email protected]:27017
+    database = "test"
+    collection = "test"
+    schema = {
+      fields {
+        _id = string
+        c_bigint = bigint
+        }
+    }
+  }
+}
+```
 
-uri to write to mongoDB
+Parameter interpretation
+------------------------
 
-### database [string]
+**MongoDB database connection URI examples**
 
-database to write to mongoDB
+Unauthenticated single node connection:
 
-### collection [string]
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
 
-collection to write to mongoDB
+Replica set connection:
 
-### common options
+```bash
+mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
+```
 
-Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+Authenticated replica set connection:
 
-## Example
+```bash
+mongodb://admin:[email protected]:27017/mydb?replicaSet=xxx&authSource=admin
+```
+
+Multi-node replica set connection:
 
 ```bash
-mongodb {
-    uri = 
"mongodb://username:[email protected]:27017/mypost?retryWrites=true&writeConcern=majority"
-    database = "mydatabase"
-    collection = "mycollection"
+mongodb://127.0.0..1:27017,127.0.0..2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
+```
+
+Sharded cluster connection:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+Multiple mongos connections:
+
+```bash
+mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
+```
+
+Note: The username and password in the URI must be URL-encoded before being 
concatenated into the connection string.
+
+**Buffer Flush**
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:[email protected]:27017"
+    database = "test_db"
+    collection = "users"
+    buffer-flush.max-rows = 2000
+    buffer-flush.interval = 1000
+    schema = {
+      fields {
+        _id = string
+        id = bigint
+        status = string
+      }
+    }
+  }
+}
+```
+
+**Why is it not recommended to use transactions for operation?**
+
+Although MongoDB has fully supported multi-document transactions since version 
4.2, it doesn't mean that everyone should use them recklessly.
+Transactions are equivalent to locks, node coordination, additional overhead, 
and performance impact.
+Instead, the principle for using transactions should be: avoid using them if 
possible.
+The necessity for using transactions can be greatly avoided by designing 
systems rationally.
+
+**Idempotent Writes**
+
+By specifying a clear primary key and using the upsert method, exactly-once 
write semantics can be achieved.
+
+If upsert-key is defined in the configuration, the MongoDB sink will use 
upsert semantics instead of regular INSERT statements. We combine the primary 
keys declared in upsert-key as the MongoDB reserved primary key and use upsert 
mode for writing to ensure idempotent writes.
+In the event of a failure, Seatunnel jobs will recover from the last 
successful checkpoint and reprocess, which may result in duplicate message 
processing during recovery. It is highly recommended to use upsert mode, as it 
helps to avoid violating database primary key constraints and generating 
duplicate data if records need to be reprocessed.
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:[email protected]:27017"
+    database = "test_db"
+    collection = "users"
+    upsert-enable = true
+    upsert-key = ["name","status"]
+    schema = {
+      fields {
+        _id = string
+        name = string
+        status = string
+      }
+    }
+  }
 }
 ```
 
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
 
-- Add MongoDB Sink Connector
+- Add MongoDB Source Connector
+
+### Next Version
+
+- [Feature]Refactor mongodb source 
connector([4380](https://github.com/apache/incubator-seatunnel/pull/4380))

Review Comment:
   ```suggestion
   - [Feature]Refactor mongodb source 
connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCursor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
+
+/** MongoReader reads MongoDB by splits (queries). */
+@Slf4j
+public class MongodbReader implements SourceReader<SeaTunnelRow, MongoSplit> {
+
+    private final Queue<MongoSplit> pendingSplits;
+
+    private final DocumentDeserializer<SeaTunnelRow> deserializer;
+
+    private final SourceReader.Context context;
+
+    private final MongodbClientProvider clientProvider;
+
+    private MongoCursor<BsonDocument> cursor;
+
+    private MongoSplit currentSplit;
+
+    private final MongodbReadOptions readOptions;
+
+    private volatile boolean noMoreSplit;
+
+    public MongodbReader(
+            SourceReader.Context context,
+            MongodbClientProvider clientProvider,
+            DocumentDeserializer<SeaTunnelRow> deserializer,
+            MongodbReadOptions mongodbReadOptions) {
+        this.deserializer = deserializer;
+        this.context = context;
+        this.clientProvider = clientProvider;
+        pendingSplits = new ConcurrentLinkedDeque<>();
+        this.readOptions = mongodbReadOptions;
+    }
+
+    @Override
+    public void open() throws Exception {
+        if (cursor != null) {
+            cursor.close();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (cursor != null) {
+            cursor.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            currentSplit = pendingSplits.poll();
+            if (null != currentSplit) {
+                if (cursor != null) {
+                    // current split is in-progress
+                    return;
+                }
+                log.info("Prepared to read split {}", currentSplit.splitId());
+                FindIterable<BsonDocument> rs =
+                        clientProvider
+                                .getDefaultCollection()
+                                .find(currentSplit.getQuery())
+                                .projection(currentSplit.getProjection())
+                                .batchSize(readOptions.getFetchSize())
+                                
.noCursorTimeout(readOptions.isNoCursorTimeout())
+                                .maxTime(readOptions.getMaxTimeMS(), 
TimeUnit.MINUTES);
+                cursor = rs.iterator();
+                while (cursor.hasNext()) {
+                    SeaTunnelRow deserialize = 
deserializer.deserialize(cursor.next());
+                    output.collect(deserialize);
+                }
+                closeCurrentSplit();
+            }
+            if (noMoreSplit && pendingSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded mongodb source");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List<MongoSplit> snapshotState(long checkpointId) throws Exception {
+        return new ArrayList<>(Collections.singleton(currentSplit));

Review Comment:
   should store `pendingSplits`?



##########
docs/en/connector-v2/source/MongoDB.md:
##########
@@ -91,5 +446,5 @@ mongodb {
 
 ### Next Version
 
-- common-options is not a required option
+- [Feature]Refactor mongodb source 
connector([4380](https://github.com/apache/incubator-seatunnel/pull/4380))

Review Comment:
   ```suggestion
   - [Feature]Refactor mongodb source 
connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java:
##########
@@ -22,31 +22,45 @@
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import java.util.ArrayList;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URI, DATABASE, COLLECTION, CatalogTableUtil.SCHEMA)
-                .optional(MATCHQUERY)
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   duplicate config `COLLECTION` ?
   
   missing `url`?



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java:
##########
@@ -18,24 +18,37 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, 
COLLECTION).build();
+        return OptionRule.builder()
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   missing `uri` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to