This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 10c6a6f154 [Feature][Connector-V2] MongodbSink Support SaveMode (#9883)
10c6a6f154 is described below

commit 10c6a6f154731a7b5d0a20f1fa2b4272c0fd6963
Author: 老王 <[email protected]>
AuthorDate: Wed Oct 15 12:08:55 2025 +0800

    [Feature][Connector-V2] MongodbSink Support SaveMode (#9883)
---
 docs/en/connector-v2/sink/MongoDB.md               |  28 +--
 .../seatunnel/mongodb/catalog/MongodbCatalog.java  | 211 ++++++++++++++++++++
 .../mongodb/catalog/MongodbCatalogFactory.java     |  48 +++++
 .../seatunnel/mongodb/config/MongodbConfig.java    |  14 ++
 .../seatunnel/mongodb/sink/MongodbSink.java        |  32 +++-
 .../seatunnel/mongodb/sink/MongodbSinkFactory.java |  13 +-
 .../mongodb/sink/MongodbWriterOptions.java         |  18 +-
 .../sink/savemode/MongodbSaveModeHandler.java      |  39 ++++
 .../connector/v2/mongodb/AbstractMongodbIT.java    |  13 +-
 .../e2e/connector/v2/mongodb/MongodbCDCIT.java     |   4 +-
 .../e2e/connector/v2/mongodb/MongodbIT.java        | 213 +++++++++++++++++++--
 11 files changed, 598 insertions(+), 35 deletions(-)

diff --git a/docs/en/connector-v2/sink/MongoDB.md 
b/docs/en/connector-v2/sink/MongoDB.md
index 04bb548aad..d4f702bd17 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -62,19 +62,21 @@ The following table lists the field data type mapping from 
MongoDB BSON type to
 
 ## Sink Options
 
-|         Name          |   Type   | Required | Default |                      
                                   Description                                  
                        |
-|-----------------------|----------|----------|---------|------------------------------------------------------------------------------------------------------------------------------|
-| uri                   | String   | Yes      | -       | The MongoDB standard 
connection uri. eg. 
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
 |
-| database              | String   | Yes      | -       | The name of MongoDB 
database to read or write.                                                      
                         |
-| collection            | String   | Yes      | -       | The name of MongoDB 
collection to read or write.                                                    
                         |
-| buffer-flush.max-rows | String   | No       | 1000    | Specifies the 
maximum number of buffered rows per batch request.                              
                               |
-| buffer-flush.interval | String   | No       | 30000   | Specifies the 
maximum interval of buffered rows per batch request, the unit is millisecond.   
                               |
-| retry.max             | String   | No       | 3       | Specifies the max 
number of retry if writing records to database failed.                          
                           |
-| retry.interval        | Duration | No       | 1000    | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
                         |
-| upsert-enable         | Boolean  | No       | false   | Whether to write 
documents via upsert mode.                                                      
                            |
-| primary-key           | List     | No       | -       | The primary keys for 
upsert/update. Keys are in `["id","name",...]` format for properties.           
                        |
-| transaction           | Boolean  | No       | false   | Whether to use 
transactions in MongoSink (requires MongoDB 4.2+).                              
                              |
-| common-options        |          | No       | -       | Source plugin common 
parameters, please refer to [Source Common Options](../sink-common-options.md) 
for details              |
+| Name                  | Type     | Required | Default | Description          
                                                                                
                                                                                
                                                  |
+|-----------------------|----------|----------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| uri                   | String   | Yes      | -      | The MongoDB standard 
connection uri. eg. 
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
                                                                                
                           |
+| database              | String   | Yes      | -      | The name of MongoDB 
database to read or write.                                                      
                                                                                
                                                   |
+| collection            | String   | Yes      | -      | The name of MongoDB 
collection to read or write.                                                    
                                                                                
                                                   |
+| buffer-flush.max-rows | String   | No       | 1000   | Specifies the maximum 
number of buffered rows per batch request.                                      
                                                                                
                                                 |
+| buffer-flush.interval | String   | No       | 30000  | Specifies the maximum 
interval of buffered rows per batch request, the unit is millisecond.           
                                                                                
                                                 |
+| retry.max             | String   | No       | 3      | Specifies the max 
number of retry if writing records to database failed.                          
                                                                                
                                                     |
+| retry.interval        | Duration | No       | 1000   | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
                                                                                
                                                   |
+| upsert-enable         | Boolean  | No       | false  | Whether to write 
documents via upsert mode.                                                      
                                                                                
                                                      |
+| primary-key           | List     | No       | -      | The primary keys for 
upsert/update. Keys are in `["id","name",...]` format for properties.           
                                                                                
                                                  |
+| transaction           | Boolean  | No       | false  | Whether to use 
transactions in MongoSink (requires MongoDB 4.2+).                              
                                                                                
                                                        |
+| common-options        |          | No       | -      | Source plugin common 
parameters, please refer to [Source Common Options](../sink-common-options.md) 
for details                                                                     
                                                   |
+| data_save_mode        | String   | No       | APPEND_DATA       | The data 
saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be 
cleared before inserting data;`APPEND_DATA`:Append data 
;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the 
collection. |
+
 
 ### Tips
 
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java
new file mode 100644
index 0000000000..ceda30fdd1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.bson.Document;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MongodbCatalog implements Catalog {
+
+    private final String catalogName;
+    private final String baseUrl;
+    private transient MongoClient mongoClient;
+    private final String defaultDatabase;
+
+    public MongodbCatalog(String catalogName, String baseUrl, String 
defaultDatabase) {
+        this.catalogName = catalogName;
+        this.baseUrl = baseUrl;
+        this.defaultDatabase = defaultDatabase;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try {
+            if (mongoClient == null) {
+                mongoClient = MongoClients.create(baseUrl);
+            }
+        } catch (Exception e) {
+            throw new CatalogException("Failed to open MongoDB Catalog: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultDatabase;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        try {
+            return listDatabases().contains(databaseName);
+        } catch (Exception e) {
+            throw new CatalogException("Failed to check database existence: " 
+ databaseName, e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try {
+            List<String> dbs = new ArrayList<>();
+            for (String name : mongoClient.listDatabaseNames()) {
+                dbs.add(name);
+            }
+            return dbs;
+        } catch (Exception e) {
+            throw new CatalogException("Failed to list databases", e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(name(), databaseName);
+        }
+        try {
+            MongoDatabase db = mongoClient.getDatabase(databaseName);
+            return db.listCollectionNames().into(new ArrayList<>());
+        } catch (Exception e) {
+            throw new CatalogException("Failed to list tables for database: " 
+ databaseName, e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return 
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        throw CommonError.unsupportedOperation(name(), "get table with 
tablePath ");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(name(), 
tablePath.getDatabaseName());
+        }
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) return;
+            throw new TableAlreadyExistException(name(), tablePath);
+        }
+        try {
+            MongoDatabase db = 
mongoClient.getDatabase(tablePath.getDatabaseName());
+            db.createCollection(tablePath.getTableName());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    "Failed to create collection: " + tablePath.getFullName(), 
e);
+        }
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            if (ignoreIfNotExists) return;
+            throw new TableNotExistException(name(), tablePath);
+        }
+        try {
+            MongoDatabase db = 
mongoClient.getDatabase(tablePath.getDatabaseName());
+            db.getCollection(tablePath.getTableName()).drop();
+        } catch (Exception e) {
+            throw new CatalogException("Failed to drop collection: " + 
tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        throw CommonError.unsupportedOperation(name(), "create database ");
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        throw CommonError.unsupportedOperation(name(), "drop database ");
+    }
+
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            if (!tableExists(tablePath)) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw new TableNotExistException(name(), tablePath);
+            }
+            MongoDatabase db = 
mongoClient.getDatabase(tablePath.getDatabaseName());
+            MongoCollection<Document> collection = 
db.getCollection(tablePath.getTableName());
+            collection.deleteMany(new Document());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    "Failed to truncate collection: " + 
tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public boolean isExistsData(TablePath tablePath) {
+        try {
+            if (!tableExists(tablePath)) {
+                return false;
+            }
+            MongoDatabase db = 
mongoClient.getDatabase(tablePath.getDatabaseName());
+            MongoCollection<Document> collection = 
db.getCollection(tablePath.getTableName());
+            return collection.estimatedDocumentCount() > 0;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        if (mongoClient != null) {
+            mongoClient.close();
+            mongoClient = null;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java
new file mode 100644
index 0000000000..05a22cd6c7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
+
+@AutoService(Factory.class)
+public class MongodbCatalogFactory implements CatalogFactory {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        return new MongodbCatalog(
+                catalogName, options.get(MongodbConfig.URI), 
options.get(MongodbConfig.DATABASE));
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(MongodbConfig.URI, 
MongodbConfig.DATABASE).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index 848a120e27..ce7f894c87 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -19,12 +19,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 
 import org.bson.json.JsonMode;
 import org.bson.json.JsonWriterSettings;
 
+import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
 public class MongodbConfig {
 
     public static final String CONNECTOR_IDENTITY = "MongoDB";
@@ -153,4 +159,12 @@ public class MongodbConfig {
 
     public static final Option<Boolean> TRANSACTION =
             
Options.key("transaction").booleanType().defaultValue(false).withDescription(".");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .singleChoice(
+                            DataSaveMode.class,
+                            Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
+                    .defaultValue(APPEND_DATA)
+                    .withDescription("The save mode of collection data");
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 563af55c19..fe1962b538 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -19,25 +19,35 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.catalog.MongodbCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.savemode.MongodbSaveModeHandler;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
 
 import java.util.Optional;
 
+import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 public class MongodbSink
         implements SeaTunnelSink<
-                SeaTunnelRow, DocumentBulk, MongodbCommitInfo, 
MongodbAggregatedCommitInfo> {
+                        SeaTunnelRow, DocumentBulk, MongodbCommitInfo, 
MongodbAggregatedCommitInfo>,
+                SupportSaveMode {
 
     private final MongodbWriterOptions options;
 
@@ -92,4 +102,24 @@ public class MongodbSink
     public Optional<CatalogTable> getWriteCatalogTable() {
         return Optional.ofNullable(catalogTable);
     }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        String url = options.getConnectString();
+        String database = options.getDatabase();
+        if (catalogTable != null) {
+            Optional<Catalog> catalogOptional =
+                    Optional.of(new MongodbCatalog(CONNECTOR_IDENTITY, url, 
database));
+            try {
+                DataSaveMode dataSaveMode = options.getDataSaveMode();
+                Catalog catalog = catalogOptional.get();
+                return Optional.of(
+                        new MongodbSaveModeHandler(
+                                SchemaSaveMode.IGNORE, dataSaveMode, catalog, 
catalogTable));
+            } catch (Exception e) {
+                throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, 
e);
+            }
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index d8e2cf03bc..43569e7d36 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -19,6 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -46,7 +48,8 @@ public class MongodbSinkFactory implements TableSinkFactory {
                         MongodbConfig.RETRY_MAX,
                         MongodbConfig.RETRY_INTERVAL,
                         MongodbConfig.UPSERT_ENABLE,
-                        MongodbConfig.PRIMARY_KEY)
+                        MongodbConfig.PRIMARY_KEY,
+                        MongodbConfig.DATA_SAVE_MODE)
                 .build();
     }
 
@@ -84,6 +87,12 @@ public class MongodbSinkFactory implements TableSinkFactory {
         if (readonlyConfig.getOptional(MongodbConfig.TRANSACTION).isPresent()) 
{
             
builder.withTransaction(readonlyConfig.get(MongodbConfig.TRANSACTION));
         }
-        return () -> new MongodbSink(builder.build(), 
context.getCatalogTable());
+        
builder.withDataSaveMode(readonlyConfig.get(MongodbConfig.DATA_SAVE_MODE));
+        CatalogTable catalogTable = context.getCatalogTable();
+        // sourceCatalogTable to sinkCatalogTable
+        TableIdentifier tableIdentifier =
+                TableIdentifier.of(CONNECTOR_IDENTITY, database, collection);
+        CatalogTable sinkCatalogTable = CatalogTable.of(tableIdentifier, 
catalogTable);
+        return () -> new MongodbSink(builder.build(), sinkCatalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
index e9b8264775..d1e4ee4aeb 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
+import org.apache.seatunnel.api.sink.DataSaveMode;
+
 import lombok.Getter;
 
 import java.io.Serializable;
@@ -46,6 +48,8 @@ public class MongodbWriterOptions implements Serializable {
 
     protected final boolean transaction;
 
+    protected final DataSaveMode dataSaveMode;
+
     public MongodbWriterOptions(
             String connectString,
             String database,
@@ -56,7 +60,8 @@ public class MongodbWriterOptions implements Serializable {
             String[] primaryKey,
             int retryMax,
             long retryInterval,
-            boolean transaction) {
+            boolean transaction,
+            DataSaveMode dataSaveMode) {
         this.connectString = connectString;
         this.database = database;
         this.collection = collection;
@@ -67,6 +72,7 @@ public class MongodbWriterOptions implements Serializable {
         this.retryMax = retryMax;
         this.retryInterval = retryInterval;
         this.transaction = transaction;
+        this.dataSaveMode = dataSaveMode;
     }
 
     public static Builder builder() {
@@ -95,6 +101,8 @@ public class MongodbWriterOptions implements Serializable {
 
         protected boolean transaction;
 
+        protected DataSaveMode dataSaveMode;
+
         public Builder withConnectString(String connectString) {
             this.connectString = connectString;
             return this;
@@ -145,6 +153,11 @@ public class MongodbWriterOptions implements Serializable {
             return this;
         }
 
+        public Builder withDataSaveMode(DataSaveMode dataSaveMode) {
+            this.dataSaveMode = dataSaveMode;
+            return this;
+        }
+
         public MongodbWriterOptions build() {
             return new MongodbWriterOptions(
                     connectString,
@@ -156,7 +169,8 @@ public class MongodbWriterOptions implements Serializable {
                     primaryKey,
                     retryMax,
                     retryInterval,
-                    transaction);
+                    transaction,
+                    dataSaveMode);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java
new file mode 100644
index 0000000000..4846fded5f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.savemode;
+
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+public class MongodbSaveModeHandler extends DefaultSaveModeHandler {
+    public MongodbSaveModeHandler(
+            SchemaSaveMode schemaSaveMode,
+            DataSaveMode dataSaveMode,
+            Catalog catalog,
+            CatalogTable catalogTable) {
+        super(schemaSaveMode, dataSaveMode, catalog, catalogTable, null);
+    }
+
+    public void handleSaveMode() {
+        // mongodb remove schema save mode,only data save mde
+        handleDataSaveMode();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index cf0b1dbd09..41a96abf02 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -31,6 +31,9 @@ import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.DockerLoggerFactory;
 
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
@@ -119,7 +122,7 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
         prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET);
     }
 
-    protected void clearDate(String table) {
+    protected void clearData(String table) {
         client.getDatabase(MONGODB_DATABASE).getCollection(table).drop();
     }
 
@@ -247,6 +250,14 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
                         .withNetwork(NETWORK)
                         .withNetworkAliases(MONGODB_CONTAINER_HOST)
                         .withExposedPorts(MONGODB_PORT)
+                        .withCreateContainerCmdModifier(
+                                cmd ->
+                                        cmd.getHostConfig()
+                                                .withPortBindings(
+                                                        new PortBinding(
+                                                                
Ports.Binding.bindPort(
+                                                                        
MONGODB_PORT),
+                                                                new 
ExposedPort(MONGODB_PORT))))
                         .waitingFor(
                                 new HttpWaitStrategy()
                                         .forPort(MONGODB_PORT)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
index 6db7db4fa1..b249dc2e88 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
@@ -63,7 +63,7 @@ public class MongodbCDCIT extends AbstractMongodbIT {
                                                 .map(Map.Entry::getValue)
                                                 
.collect(Collectors.toCollection(ArrayList::new)))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_CDC_RESULT_TABLE);
+        clearData(MONGODB_CDC_RESULT_TABLE);
     }
 
     @TestTemplate
@@ -85,6 +85,6 @@ public class MongodbCDCIT extends AbstractMongodbIT {
                                                 .map(Map.Entry::getValue)
                                                 
.collect(Collectors.toCollection(ArrayList::new)))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_CDC_RESULT_TABLE);
+        clearData(MONGODB_CDC_RESULT_TABLE);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index b289af315f..4a721333e9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -17,20 +17,48 @@
 
 package org.apache.seatunnel.e2e.connector.v2.mongodb;
 
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongoKeyExtractor;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSink;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
+import org.bson.BsonDocument;
 import org.bson.Document;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.WriteModel;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
+
 @Slf4j
 public class MongodbIT extends AbstractMongodbIT {
 
@@ -42,7 +70,7 @@ public class MongodbIT extends AbstractMongodbIT {
 
         Container.ExecResult assertResult = 
container.executeJob("/mongodb_source_to_assert.conf");
         Assertions.assertEquals(0, assertResult.getExitCode(), 
assertResult.getStderr());
-        clearDate(MONGODB_SINK_TABLE);
+        clearData(MONGODB_SINK_TABLE);
     }
 
     @TestTemplate
@@ -59,8 +87,8 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_NULL_TABLE_RESULT).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_NULL_TABLE);
-        clearDate(MONGODB_NULL_TABLE_RESULT);
+        clearData(MONGODB_NULL_TABLE);
+        clearData(MONGODB_NULL_TABLE_RESULT);
     }
 
     @TestTemplate
@@ -78,7 +106,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_MATCH_RESULT_TABLE);
+        clearData(MONGODB_MATCH_RESULT_TABLE);
 
         Container.ExecResult projectionResult =
                 
container.executeJob("/matchIT/mongodb_matchProjection_source_to_assert.conf");
@@ -93,7 +121,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_MATCH_RESULT_TABLE);
+        clearData(MONGODB_MATCH_RESULT_TABLE);
     }
 
     @TestTemplate
@@ -112,7 +140,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
         Assertions.assertEquals(0, assertResult.getExitCode(), 
assertResult.getStderr());
 
-        clearDate(MONGODB_UPDATE_TABLE);
+        clearData(MONGODB_UPDATE_TABLE);
     }
 
     @TestTemplate
@@ -126,7 +154,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 
container.executeJob("/flatIT/mongodb_flat_source_to_assert.conf");
         Assertions.assertEquals(0, assertResult.getExitCode(), 
assertResult.getStderr());
 
-        clearDate(MONGODB_FLAT_TABLE);
+        clearData(MONGODB_FLAT_TABLE);
     }
 
     @TestTemplate
@@ -144,7 +172,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_SPLIT_RESULT_TABLE);
+        clearData(MONGODB_SPLIT_RESULT_TABLE);
 
         Container.ExecResult projectionResult =
                 
container.executeJob("/splitIT/mongodb_split_size_source_to_assert.conf");
@@ -158,7 +186,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_SPLIT_RESULT_TABLE);
+        clearData(MONGODB_SPLIT_RESULT_TABLE);
     }
 
     @TestTemplate
@@ -177,7 +205,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
         Assertions.assertEquals(0, assertResult.getExitCode(), 
assertResult.getStderr());
 
-        clearDate(MONGODB_UPDATE_TABLE);
+        clearData(MONGODB_UPDATE_TABLE);
 
         // `matchQuery` compatible test
         Container.ExecResult queryResult =
@@ -192,7 +220,7 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_MATCH_RESULT_TABLE);
+        clearData(MONGODB_MATCH_RESULT_TABLE);
     }
 
     @TestTemplate
@@ -218,8 +246,8 @@ public class MongodbIT extends AbstractMongodbIT {
         Assertions.assertEquals(
                 0, assertUpsertResult.getExitCode(), 
assertUpsertResult.getStderr());
 
-        clearDate(MONGODB_TRANSACTION_SINK_TABLE);
-        clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
+        clearData(MONGODB_TRANSACTION_SINK_TABLE);
+        clearData(MONGODB_TRANSACTION_UPSERT_TABLE);
     }
 
     @TestTemplate
@@ -235,6 +263,163 @@ public class MongodbIT extends AbstractMongodbIT {
                 readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream()
                         .peek(e -> e.remove("_id"))
                         .collect(Collectors.toList()));
-        clearDate(MONGODB_DOUBLE_TABLE_RESULT);
+        clearData(MONGODB_DOUBLE_TABLE_RESULT);
+    }
+
+    @SneakyThrows
+    @TestTemplate
+    public void testDropDataSaveMode(TestContainer container) {
+        // test drop data save mode
+        String collectionName = "drop_data_save_mode_coll";
+        MongoCollection<BsonDocument> collection =
+                client.getDatabase(MONGODB_DATABASE)
+                        .getCollection(collectionName, BsonDocument.class);
+        // insert one row
+        beforeInsertData(collectionName, DataSaveMode.DROP_DATA, collection);
+        // build sink
+        final MongodbSink mongoDbSink = getSinkInstance(collectionName, 
DataSaveMode.DROP_DATA);
+        final SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> writer 
=
+                mongoDbSink.createWriter(null);
+        final Optional<SaveModeHandler> saveModeHandlerOptional = 
mongoDbSink.getSaveModeHandler();
+        // do save mode
+        if (saveModeHandlerOptional.isPresent()) {
+            final SaveModeHandler saveModeHandler = 
saveModeHandlerOptional.get();
+            saveModeHandler.open();
+            saveModeHandler.handleSaveMode();
+            saveModeHandler.close();
+        }
+        // do write
+        writer.write(getSeaTunnelRowOne());
+        Assertions.assertEquals(1L, collection.countDocuments());
+        // clear
+        collection.drop();
+    }
+
+    @SneakyThrows
+    @TestTemplate
+    public void testAppendDataSaveMode(TestContainer container) {
+        // test drop data save mode
+        String collectionName = "append_data_save_mode_coll";
+        MongoCollection<BsonDocument> collection =
+                client.getDatabase(MONGODB_DATABASE)
+                        .getCollection(collectionName, BsonDocument.class);
+        // insert one row
+        beforeInsertData(collectionName, DataSaveMode.APPEND_DATA, collection);
+        // build sink
+        final MongodbSink mongoDbSink = getSinkInstance(collectionName, 
DataSaveMode.APPEND_DATA);
+        final SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> writer 
=
+                mongoDbSink.createWriter(null);
+        final Optional<SaveModeHandler> saveModeHandlerOptional = 
mongoDbSink.getSaveModeHandler();
+        // do save mode
+        if (saveModeHandlerOptional.isPresent()) {
+            final SaveModeHandler saveModeHandler = 
saveModeHandlerOptional.get();
+            saveModeHandler.open();
+            saveModeHandler.handleSaveMode();
+            saveModeHandler.close();
+        }
+        // do write
+        writer.write(getSeaTunnelRowOne());
+        Assertions.assertEquals(3L, collection.countDocuments());
+        // clear
+        collection.drop();
+    }
+
+    @SneakyThrows
+    @TestTemplate
+    public void testErrorWhenDataExistsSaveMode(TestContainer container) {
+        // test drop data save mode
+        String collectionName = "error_data_save_mode_coll";
+        MongoCollection<BsonDocument> collection =
+                client.getDatabase(MONGODB_DATABASE)
+                        .getCollection(collectionName, BsonDocument.class);
+        // insert one row
+        beforeInsertData(collectionName, DataSaveMode.ERROR_WHEN_DATA_EXISTS, 
collection);
+        // build sink
+        final MongodbSink mongoDbSink =
+                getSinkInstance(collectionName, 
DataSaveMode.ERROR_WHEN_DATA_EXISTS);
+        final SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> writer 
=
+                mongoDbSink.createWriter(null);
+        final Optional<SaveModeHandler> saveModeHandlerOptional = 
mongoDbSink.getSaveModeHandler();
+        // do save mode
+        if (saveModeHandlerOptional.isPresent()) {
+            final SaveModeHandler saveModeHandler = 
saveModeHandlerOptional.get();
+            saveModeHandler.open();
+            Assertions.assertThrows(
+                    SeaTunnelRuntimeException.class,
+                    saveModeHandler::handleDataSaveMode,
+                    "When there exist data, an error will be reported");
+            saveModeHandler.close();
+        }
+        Assertions.assertEquals(2L, collection.countDocuments());
+        // clear
+        collection.drop();
+    }
+
+    private void beforeInsertData(
+            String collection,
+            DataSaveMode dataSaveMode,
+            MongoCollection<BsonDocument> dropDataCollection) {
+        final RowDataDocumentSerializer rowDataDocumentSerializer =
+                new RowDataDocumentSerializer(
+                        RowDataToBsonConverters.createConverter(
+                                
getCatalogTable(collection).getSeaTunnelRowType()),
+                        getMongodbWriterOptions(collection, dataSaveMode),
+                        new 
MongoKeyExtractor(getMongodbWriterOptions(collection, dataSaveMode)));
+        WriteModel<BsonDocument> bsonDocumentWriteModelOne =
+                
rowDataDocumentSerializer.serializeToWriteModel(getSeaTunnelRowOne());
+        WriteModel<BsonDocument> bsonDocumentWriteModelTwo =
+                
rowDataDocumentSerializer.serializeToWriteModel(getSeaTunnelRowTwo());
+        List<WriteModel<BsonDocument>> writeModelList = new ArrayList<>();
+        writeModelList.add(bsonDocumentWriteModelOne);
+        writeModelList.add(bsonDocumentWriteModelTwo);
+        dropDataCollection.bulkWrite(writeModelList);
+    }
+
+    private SeaTunnelRow getSeaTunnelRowOne() {
+        return new SeaTunnelRow(new Object[] {1L, "A", 100});
+    }
+
+    private SeaTunnelRow getSeaTunnelRowTwo() {
+        return new SeaTunnelRow(new Object[] {2L, "B", 200});
+    }
+
+    private MongodbSink getSinkInstance(String collection, DataSaveMode 
dataSaveMode) {
+        return new MongodbSink(
+                getMongodbWriterOptions(collection, dataSaveMode), 
getCatalogTable(collection));
+    }
+
+    private MongodbWriterOptions getMongodbWriterOptions(
+            String collection, DataSaveMode dataSaveMode) {
+        String host = mongodbContainer.getContainerIpAddress();
+        int port = mongodbContainer.getFirstMappedPort();
+        String url = String.format("mongodb://%s:%d/%s", host, port, 
MONGODB_DATABASE);
+        return MongodbWriterOptions.builder()
+                .withConnectString(url)
+                .withDatabase(MONGODB_DATABASE)
+                .withCollection(collection)
+                .withDataSaveMode(dataSaveMode)
+                .withFlushSize(1)
+                .build();
+    }
+
+    private CatalogTable getCatalogTable(String collection) {
+        return CatalogTable.of(
+                TableIdentifier.of(CONNECTOR_IDENTITY, MONGODB_DATABASE, 
collection),
+                getTableSchema(),
+                new HashMap<>(),
+                new ArrayList<>(),
+                "");
+    }
+
+    private TableSchema getTableSchema() {
+        return new TableSchema(getColumns(), null, null);
+    }
+
+    private List<Column> getColumns() {
+        List<Column> columns = new ArrayList<>();
+        columns.add(new PhysicalColumn("c_int", BasicType.LONG_TYPE, 64L, 0, 
true, "", ""));
+        columns.add(new PhysicalColumn("name", BasicType.STRING_TYPE, 100L, 0, 
true, "", ""));
+        columns.add(new PhysicalColumn("score", BasicType.INT_TYPE, 32L, 0, 
true, "", ""));
+        return columns;
     }
 }

Reply via email to