This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b6b6b0a4 [cdc] SyncAction based on MongoDB (#1809)
2b6b6b0a4 is described below

commit 2b6b6b0a4a7071d7f24ce89c1ee6b920d4db482b
Author: monster <[email protected]>
AuthorDate: Thu Aug 17 13:39:58 2023 +0800

    [cdc] SyncAction based on MongoDB (#1809)
---
 docs/content/how-to/cdc-ingestion.md               | 162 ++++++++++++++
 paimon-flink/paimon-flink-common/pom.xml           |  29 +++
 .../flink/action/cdc/mongodb/JsonParserUtils.java  | 247 +++++++++++++++++++++
 .../paimon/flink/action/cdc/mongodb/ModeEnum.java  |  25 +++
 .../action/cdc/mongodb/MongoDBActionUtils.java     | 183 +++++++++++++++
 .../action/cdc/mongodb/MongoDBRecordParser.java    |  82 +++++++
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     | 236 ++++++++++++++++++++
 .../mongodb/MongoDBSyncDatabaseActionFactory.java  | 142 ++++++++++++
 .../action/cdc/mongodb/MongoDBSyncTableAction.java | 172 ++++++++++++++
 .../cdc/mongodb/MongoDBSyncTableActionFactory.java | 136 ++++++++++++
 .../flink/action/cdc/mongodb/MongodbSchema.java    | 149 +++++++++++++
 .../mongodb/strategy/Mongo4VersionStrategy.java    |  99 +++++++++
 .../cdc/mongodb/strategy/MongoVersionStrategy.java | 133 +++++++++++
 .../services/org.apache.paimon.factories.Factory   |   2 +
 .../action/cdc/mongodb/JsonParserUtilsTest.java    |  76 +++++++
 .../cdc/mongodb/MongoDBActionITCaseBase.java       | 202 +++++++++++++++++
 .../flink/action/cdc/mongodb/MongoDBContainer.java | 202 +++++++++++++++++
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   | 123 ++++++++++
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  | 183 +++++++++++++++
 .../test/resources/mongodb/database/test-data-1.js |  35 +++
 .../test/resources/mongodb/database/test-data-2.js |  20 ++
 .../test/resources/mongodb/database/test-data-3.js |  18 ++
 .../test/resources/mongodb/database/test-data-4.js |  18 ++
 .../src/test/resources/mongodb/docker/random.key   |  31 +++
 .../src/test/resources/mongodb/docker/setup.js     |  46 ++++
 .../test/resources/mongodb/table/inventory-1.js    |  35 +++
 .../test/resources/mongodb/table/inventory-2.js    |  18 ++
 .../test/resources/mongodb/table/inventory-3.js    |  29 +++
 .../test/resources/mongodb/table/inventory-4.js    |  16 ++
 29 files changed, 2849 insertions(+)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 4daf9cd84..7675ad697 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -36,6 +36,8 @@ We currently support the following sync ways:
 3. [API Synchronizing Table]({{< ref "/api/flink-api#cdc-ingestion-table" 
>}}): synchronize your custom DataStream input into one Paimon table.
 4. Kafka Synchronizing Table: synchronize one Kafka topic's table into one 
Paimon table. 
 5. Kafka Synchronizing Database: synchronize one Kafka topic containing 
multiple tables or multiple topics containing one table each into one Paimon 
database.
+6. MongoDB Synchronizing Collection: synchronize one Collection from MongoDB 
into one Paimon table. 
+7. MongoDB Synchronizing Database: synchronize the whole MongoDB database into 
one Paimon database.
 
 ## MySQL
 
@@ -415,6 +417,166 @@ Synchronization from multiple Kafka topics to Paimon 
database.
     --table-conf changelog-producer=input \
     --table-conf sink.parallelism=4
 ```
+## MongoDB
+
+### Prepare MongoDB Bundled Jar
+
+```
+flink-sql-connector-mongodb-*.jar
+```
+
+### Synchronizing Tables
+
+By using [MongoDBSyncTableAction](/docs/{{< param Branch 
>}}/api/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction) 
in a Flink DataStream job or directly through `flink run`, users can 
synchronize one collection from MongoDB into one Paimon table.
+
+To use this feature through `flink run`, run the following shell command.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mongodb-sync-table
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table <table-name> \
+    [--partition-keys <partition-keys>] \
+    [--mongodb-conf <mongodb-cdc-source-conf> [--mongodb-conf 
<mongodb-cdc-source-conf> ...]] \
+    [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
+    [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
+```
+Here are a few points to take note of:
+
+1. The "mongodb-conf" introduces the "schema.start.mode" parameter on top of 
the MongoDB CDC source configuration."schema.start.mode" provides two modes: 
"dynamic" (default) and "specified".
+In "dynamic" mode, MongoDB schema information is parsed at one level, which 
forms the basis for schema change evolution.
+In "specified" mode, synchronization takes place according to specified 
criteria.
+This can be done by configuring "field.name" to specify the synchronization 
fields and "parser.path" to specify the JSON parsing path for those fields.
+The difference between the two is that the "specify" mode requires the user to 
explicitly identify the fields to be used and create a mapping table based on 
those fields.
+Dynamic mode, on the other hand, ensures that Paimon and MongoDB always keep 
the top-level fields consistent, eliminating the need to focus on specific 
fields.
+Further processing of the data table is required when using values from nested 
fields.
+
+2. The synchronized table is required to have its primary key set as `_id`. 
+This is because MongoDB's change events are recorded before updates in 
messages. 
+Consequently, we can only convert them into Flink's UPSERT change log stream. 
+The upstart stream demands a unique key, which is why we must declare `_id` as 
the primary key. 
+Declaring other columns as primary keys is not feasible, as delete operations 
only encompass the _id and sharding key, excluding other keys and values.
+
+3. MongoDB Change Streams are designed to return simple JSON documents without 
any data type definitions. This is because MongoDB is a document-oriented 
database, and one of its core features is the dynamic schema, where documents 
can contain different fields, and the data types of fields can be flexible. 
Therefore, the absence of data type definitions in Change Streams is to 
maintain this flexibility and extensibility.
+For this reason, we have set all field data types for synchronizing MongoDB to 
Paimon as String to address the issue of not being able to obtain data types.
+
+{{< generated/mongodb_sync_table >}}
+
+If the Paimon table you specify does not exist, this action will automatically 
create the table. Its schema will be derived from MongoDB collection. 
+
+Example 1: synchronize collection into one Paimon table
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mongodb-sync-table \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --partition-keys pt \
+    --mongodb-conf hosts=127.0.0.1:27017 \
+    --mongodb-conf username=root \
+    --mongodb-conf password=123456 \
+    --mongodb-conf database=source_db \
+    --mongodb-conf collection=source_table1 \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --table-conf changelog-producer=input \
+    --table-conf sink.parallelism=4
+```
+
+Example 2: Synchronize collection into a Paimon table according to the 
specified field mapping.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mongodb-sync-table \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --partition-keys pt \
+    --mongodb-conf hosts=127.0.0.1:27017 \
+    --mongodb-conf username=root \
+    --mongodb-conf password=123456 \
+    --mongodb-conf database=source_db \
+    --mongodb-conf collection=source_table1 \
+    --mongodb-conf schema.start.mode=specified \
+    --mongodb-conf field.name=_id,name,description \
+    --mongodb-conf parser.path=_id,name,description \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --table-conf changelog-producer=input \
+    --table-conf sink.parallelism=4
+```
+
+### Synchronizing Databases
+
+By using [MongoDBSyncDatabaseAction](/docs/{{< param Branch 
>}}/api/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction)
 in a Flink DataStream job or directly through `flink run`, users can 
synchronize the whole MongoDB database into one Paimon database.
+
+To use this feature through `flink run`, run the following shell command.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mongodb-sync-database
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    [--table-prefix <paimon-table-prefix>] \
+    [--table-suffix <paimon-table-suffix>] \
+    [--including-tables <mongodb-table-name|name-regular-expr>] \
+    [--excluding-tables <mongodb-table-name|name-regular-expr>] \
+    [--mongodb-conf <mongodb-cdc-source-conf> [--mongodb-conf 
<mongodb-cdc-source-conf> ...]] \
+    [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
+    [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
+```
+
+{{< generated/mongodb_sync_database >}}
+
+All collections to be synchronized need to set _id as the primary key.
+For each MongoDB collection to be synchronized, if the corresponding Paimon 
table does not exist, this action will automatically create the table. 
+Its schema will be derived from all specified MongoDB collection. If the 
Paimon table already exists, its schema will be compared against the schema of 
all specified MongoDB collection.
+Any MongoDB tables created after the commencement of the task will 
automatically be included.
+
+Example 1: synchronize entire database
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mongodb-sync-database \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --mongodb-conf hosts=127.0.0.1:27017 \
+    --mongodb-conf username=root \
+    --mongodb-conf password=123456 \
+    --mongodb-conf database=source_db \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --table-conf changelog-producer=input \
+    --table-conf sink.parallelism=4
+```
+Example 2: Synchronize the specified table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+--fromSavepoint savepointPath \
+/path/to/paimon-flink-action-{{< version >}}.jar \
+mongodb-sync-database \
+--warehouse hdfs:///path/to/warehouse \
+--database test_db \
+--mongodb-conf hosts=127.0.0.1:27017 \
+--mongodb-conf username=root \
+--mongodb-conf password=123456 \
+--mongodb-conf database=source_db \
+--catalog-conf metastore=hive \
+--catalog-conf uri=thrift://hive-metastore:9083 \
+--table-conf bucket=4 \
+--including-tables 'product|user|address|order|custom'
+```
 
 ## Schema Change Evolution
 
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 4d84694d2..53ff1344d 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -38,6 +38,8 @@ under the License.
         <flink.cdc.version>2.3.0</flink.cdc.version>
         <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
         <geometry.version>2.2.0</geometry.version>
+        <avro.version>1.11.1</avro.version>
+        <mongodb.testcontainers.version>1.18.3</mongodb.testcontainers.version>
     </properties>
 
     <dependencies>
@@ -91,6 +93,26 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-mongodb-cdc</artifactId>
+            <version>2.4.1</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- cdc dependencies end -->
 
         <dependency>
@@ -267,6 +289,13 @@ under the License.
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mongodb</artifactId>
+            <version>${mongodb.testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
new file mode 100644
index 000000000..61340542d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.type.MapType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * specified, and return json string of the extracted json object. It will 
return null if the input
+ * json string is invalid. A limited version of JSONPath supported: $ : Root 
object . : Child
+ * operator [] : Subscript operator for array * : Wildcard for [] Syntax not 
supported that's worth
+ * noticing: '' : Zero length string as key .. : Recursive descent &amp;#064; 
: Current
+ * object/element () : Script expression ?() : Filter (script) expression. [,] 
: Union operator
+ * [start:end:step] : array slice operator.
+ */
+@SuppressWarnings("unchecked")
+public class JsonParserUtils implements Serializable {
+
+    private static final Pattern patternKey = 
Pattern.compile("^([a-zA-Z0-9_\\-:\\s]+).*");
+    private static final Pattern patternIndex = 
Pattern.compile("\\[([0-9]+|\\*)]");
+    private static final JsonFactory JSON_FACTORY = new JsonFactory();
+
+    static {
+        // Allows for unescaped ASCII control characters in JSON values
+        JSON_FACTORY.enable(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
+    }
+
+    private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
+    private static final MapType MAP_TYPE =
+            MAPPER.getTypeFactory().constructMapType(Map.class, String.class, 
Object.class);
+
+    // An LRU cache using a linked hash map
+    static class HashCache<K, V> extends LinkedHashMap<K, V> {
+
+        private static final int CACHE_SIZE = 16;
+        private static final int INIT_SIZE = 32;
+        private static final float LOAD_FACTOR = 0.6f;
+
+        HashCache() {
+            super(INIT_SIZE, LOAD_FACTOR);
+        }
+
+        private static final long serialVersionUID = 1;
+
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+            return size() > CACHE_SIZE;
+        }
+    }
+
+    static Map<String, Object> extractObjectCache = new HashCache<>();
+    static Map<String, String[]> pathExprCache = new HashCache<>();
+    static Map<String, ArrayList<String>> indexListCache = new HashCache<>();
+    static Map<String, String> mKeyGroupCache = new HashCache<>();
+    static Map<String, Boolean> mKeyMatchesCache = new HashCache<>();
+
+    public static String evaluate(String jsonString, String pathString) {
+
+        if (jsonString == null
+                || jsonString.equals("")
+                || pathString == null
+                || pathString.equals("")) {
+            return null;
+        }
+
+        // Cache pathExpr
+        String[] pathExpr = pathExprCache.computeIfAbsent(pathString, s -> 
s.split("\\.", -1));
+
+        if (!pathExpr[0].equalsIgnoreCase("$")) {
+            return null;
+        }
+        // Cache extractObject
+        Object extractObject = extractObjectCache.get(jsonString);
+        if (extractObject == null) {
+            try {
+                extractObject = MAPPER.readValue(jsonString, MAP_TYPE);
+            } catch (Exception e) {
+                return null;
+            }
+            extractObjectCache.put(jsonString, extractObject);
+        }
+        for (int i = 1; i < pathExpr.length; i++) {
+            if (extractObject == null) {
+                return null;
+            }
+            extractObject = extract(extractObject, pathExpr[i]);
+        }
+        if (extractObject instanceof Map || extractObject instanceof List) {
+            try {
+                return MAPPER.writeValueAsString(extractObject);
+            } catch (Exception e) {
+                return null;
+            }
+        } else if (extractObject != null) {
+            return extractObject.toString();
+        } else {
+            return null;
+        }
+    }
+
+    private static Object extract(Object json, String path) {
+
+        // Cache patternkey.matcher(path).matches()
+        Matcher mKey = null;
+        Boolean mKeyMatches = mKeyMatchesCache.get(path);
+        if (mKeyMatches == null) {
+            mKey = patternKey.matcher(path);
+            mKeyMatches = mKey.matches() ? Boolean.TRUE : Boolean.FALSE;
+            mKeyMatchesCache.put(path, mKeyMatches);
+        }
+        if (!mKeyMatches) {
+            return null;
+        }
+
+        // Cache mkey.group(1)
+        String mKeyGroup1 = mKeyGroupCache.get(path);
+        if (mKeyGroup1 == null) {
+            if (mKey == null) {
+                mKey = patternKey.matcher(path);
+            }
+            mKeyGroup1 = mKey.group(1);
+            mKeyGroupCache.put(path, mKeyGroup1);
+        }
+        json = extract_json_key(json, mKeyGroup1);
+
+        // Cache indexList
+        ArrayList<String> indexList = indexListCache.get(path);
+        if (indexList == null) {
+            Matcher mIndex = patternIndex.matcher(path);
+            indexList = new ArrayList<>();
+            while (mIndex.find()) {
+                indexList.add(mIndex.group(1));
+            }
+            indexListCache.put(path, indexList);
+        }
+
+        if (indexList.size() > 0) {
+            json = extract_json_withIndex(json, indexList);
+        }
+
+        return json;
+    }
+
+    private static Object extract_json_withIndex(Object json, 
ArrayList<String> indexList) {
+        List<Object> jsonList = new ArrayList<>();
+        jsonList.add(json);
+        for (String index : indexList) {
+            List<Object> tmpJsonList = new ArrayList<>();
+            if (index.equalsIgnoreCase("*")) {
+                for (Object array : jsonList) {
+                    if (array instanceof List) {
+                        tmpJsonList.addAll((List<Object>) array);
+                    }
+                }
+                jsonList = tmpJsonList;
+            } else {
+                for (int i = 0; i < (jsonList).size(); i++) {
+                    Object array = jsonList.get(i);
+                    int indexValue = Integer.parseInt(index);
+                    if (!(array instanceof List)) {
+                        continue;
+                    }
+                    if (indexValue >= ((List<Object>) array).size()) {
+                        return null;
+                    }
+                    tmpJsonList.add(((List<Object>) array).get(indexValue));
+                    jsonList = tmpJsonList;
+                }
+            }
+        }
+        if (jsonList.isEmpty()) {
+            return null;
+        }
+        return (jsonList.size() > 1) ? new ArrayList<>(jsonList) : 
jsonList.get(0);
+    }
+
+    private static Object extract_json_key(Object json, String path) {
+        if (json instanceof List) {
+            List<Object> jsonArray = new ArrayList<>();
+            for (int i = 0; i < ((List<Object>) json).size(); i++) {
+                Object jsonElem = ((List<Object>) json).get(i);
+                Object jsonObj;
+                if (jsonElem instanceof Map) {
+                    jsonObj = ((Map<String, Object>) jsonElem).get(path);
+                } else {
+                    continue;
+                }
+                if (jsonObj instanceof List) {
+                    jsonArray.addAll((List<Object>) jsonObj);
+                } else if (jsonObj != null) {
+                    jsonArray.add(jsonObj);
+                }
+            }
+            return (jsonArray.size() == 0) ? null : jsonArray;
+        } else if (json instanceof Map) {
+            return ((Map<String, Object>) json).get(path);
+        } else {
+            return null;
+        }
+    }
+
+    public static LinkedHashMap<String, String> extractMap(String jsonString) {
+        try {
+            LinkedHashMap<String, ?> originalMap = 
MAPPER.readValue(jsonString, MAP_TYPE);
+            LinkedHashMap<String, String> stringMap = new LinkedHashMap<>();
+
+            originalMap.forEach(
+                    (key, value) -> {
+                        String stringValue = Objects.toString(value, null);
+                        stringMap.put(key.toLowerCase(), stringValue);
+                    });
+
+            return stringMap;
+
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
new file mode 100644
index 000000000..926999fb1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+/** schema acquisition mode. */
+public enum ModeEnum {
+    SPECIFIED,
+    DYNAMIC
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
new file mode 100644
index 000000000..1b062a8a1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.bson.Document;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Utils for MongoDB Action. */
+public class MongoDBActionUtils {
+
+    public static final ConfigOption<String> FIELD_NAME =
+            ConfigOptions.key("field.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Set the field names to be synchronized in the  
`specified` mode.");
+
+    public static final ConfigOption<String> PARSER_PATH =
+            ConfigOptions.key("parser.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Configure the JSON parsing path for synchronizing 
field values in the `specified` mode.");
+
+    public static final ConfigOption<String> START_MODE =
+            ConfigOptions.key("schema.start.mode")
+                    .stringType()
+                    .defaultValue("dynamic")
+                    .withDescription("Can choose between the `dynamic` and 
`specified` modes.");
+
+    static MongoDBSource<String> buildMongodbSource(Configuration 
mongodbConfig, String tableList) {
+        validateMongodbConfig(mongodbConfig);
+        MongoDBSourceBuilder<String> sourceBuilder = MongoDBSource.builder();
+
+        if (mongodbConfig.contains(MongoDBSourceOptions.USERNAME)
+                && mongodbConfig.contains(MongoDBSourceOptions.PASSWORD)) {
+            sourceBuilder
+                    .username(mongodbConfig.get(MongoDBSourceOptions.USERNAME))
+                    
.password(mongodbConfig.get(MongoDBSourceOptions.PASSWORD));
+        }
+        
Optional.ofNullable(mongodbConfig.get(MongoDBSourceOptions.CONNECTION_OPTIONS))
+                .ifPresent(sourceBuilder::connectionOptions);
+        Optional.ofNullable(mongodbConfig.get(MongoDBSourceOptions.BATCH_SIZE))
+                .ifPresent(sourceBuilder::batchSize);
+        
Optional.ofNullable(mongodbConfig.get(MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS))
+                .ifPresent(sourceBuilder::heartbeatIntervalMillis);
+        Optional.ofNullable(mongodbConfig.get(MongoDBSourceOptions.SCHEME))
+                .ifPresent(sourceBuilder::scheme);
+
+        
Optional.ofNullable(mongodbConfig.get(MongoDBSourceOptions.POLL_MAX_BATCH_SIZE))
+                .ifPresent(sourceBuilder::pollMaxBatchSize);
+
+        
Optional.ofNullable(mongodbConfig.get(MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS))
+                .ifPresent(sourceBuilder::pollAwaitTimeMillis);
+
+        sourceBuilder
+                .hosts(mongodbConfig.get(MongoDBSourceOptions.HOSTS))
+                .databaseList(mongodbConfig.get(MongoDBSourceOptions.DATABASE))
+                .collectionList(tableList);
+
+        String startupMode = 
mongodbConfig.get(SourceOptions.SCAN_STARTUP_MODE);
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.initial());
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.latest());
+        } else if ("timestamp".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(
+                    StartupOptions.timestamp(
+                            
mongodbConfig.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+        }
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+
+        return sourceBuilder.deserializer(schema).build();
+    }
+
+    private static void validateMongodbConfig(Configuration mongodbConfig) {
+        checkArgument(
+                mongodbConfig.get(MongoDBSourceOptions.HOSTS) != null,
+                String.format(
+                        "mongodb-conf [%s] must be specified.", 
MongoDBSourceOptions.HOSTS.key()));
+
+        checkArgument(
+                mongodbConfig.get(MongoDBSourceOptions.DATABASE) != null,
+                String.format(
+                        "mongodb-conf [%s] must be specified.",
+                        MongoDBSourceOptions.DATABASE.key()));
+    }
+
+    static Schema buildPaimonSchema(
+            MongodbSchema mongodbSchema,
+            List<String> specifiedPartitionKeys,
+            Map<String, String> paimonConfig,
+            boolean caseSensitive) {
+        Schema.Builder builder = Schema.newBuilder();
+        builder.options(paimonConfig);
+
+        Map<String, DataType> mongodbFields;
+        if (caseSensitive) {
+            mongodbFields = mongodbSchema.fields();
+        } else {
+            mongodbFields = new LinkedHashMap<>();
+            for (Map.Entry<String, DataType> entry : 
mongodbSchema.fields().entrySet()) {
+                String fieldName = entry.getKey();
+                checkArgument(
+                        !mongodbFields.containsKey(fieldName.toLowerCase()),
+                        String.format(
+                                "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
+                                fieldName, mongodbSchema.tableName()));
+                mongodbFields.put(fieldName.toLowerCase(), entry.getValue());
+            }
+        }
+
+        for (Map.Entry<String, DataType> entry : mongodbFields.entrySet()) {
+            builder.column(entry.getKey(), entry.getValue());
+        }
+
+        builder.primaryKey(Lists.newArrayList("_id"));
+
+        if (specifiedPartitionKeys.size() > 0) {
+            builder.partitionKeys(specifiedPartitionKeys);
+        }
+
+        return builder.build();
+    }
+
+    public static int getMongoDBVersion(Configuration mongodbConfig) {
+        String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
+        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
+
+        String url = String.format("mongodb://%s/%s", hosts, databaseName);
+        try (MongoClient mongoClient = MongoClients.create(url)) {
+            MongoDatabase database = mongoClient.getDatabase(databaseName);
+            Document buildInfo = database.runCommand(new Document("buildInfo", 
1));
+            String[] split = ((String) buildInfo.get("version")).split("\\.");
+            return Integer.parseInt(split[0]);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
new file mode 100644
index 000000000..b0c06a627
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
+import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+/** Convert MongoDB Debezium JSON string to list of {@link 
RichCdcMultiplexRecord}s. */
+public class MongoDBRecordParser implements FlatMapFunction<String, 
RichCdcMultiplexRecord> {
+
+    private static final String FIELD_DATABASE = "db";
+    private static final String FIELD_TABLE = "coll";
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final boolean caseSensitive;
+    private final TableNameConverter tableNameConverter;
+    private final Configuration mongodbConfig;
+    private JsonNode root;
+
+    public MongoDBRecordParser(boolean caseSensitive, Configuration 
mongodbConfig) {
+
+        this(caseSensitive, new TableNameConverter(caseSensitive), 
mongodbConfig);
+    }
+
+    public MongoDBRecordParser(
+            boolean caseSensitive,
+            TableNameConverter tableNameConverter,
+            Configuration mongodbConfig) {
+        this.caseSensitive = caseSensitive;
+        this.tableNameConverter = tableNameConverter;
+        this.mongodbConfig = mongodbConfig;
+    }
+
+    @Override
+    public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) 
throws Exception {
+        root = objectMapper.readValue(value, JsonNode.class);
+        String databaseName = extractString(FIELD_DATABASE);
+        String collection = 
tableNameConverter.convert(extractString(FIELD_TABLE));
+        MongoVersionStrategy versionStrategy =
+                new Mongo4VersionStrategy(databaseName, collection, 
caseSensitive, mongodbConfig);
+
+        // TODO:Upgrade mongodb cdc to 2.5 and enable scanFullChangelog 
capability
+        // [Full Changelog]
+        // 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html
+        //        if (mongodbVersion >= 6) {
+        //            versionStrategy = new 
Mongo6VersionStrategy(databaseName, collection,
+        // caseSensitive);
+        //        } else {
+        //            versionStrategy = new 
Mongo4VersionStrategy(databaseName, collection,
+        // caseSensitive);
+        //        }
+        versionStrategy.extractRecords(root).forEach(out::collect);
+    }
+
+    private String extractString(String key) {
+        return root.get("ns").get(key).asText();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
new file mode 100644
index 000000000..82c879d89
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordSchemaBuilder;
+
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * An {@link Action} which synchronize the whole MongoDB database into one 
Paimon database.
+ *
+ * <p>You should specify MongoDB source database in {@code mongodbConfig}. See 
<a
+ * 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document
+ * of flink-cdc-connectors</a> for detailed keys and values.
+ *
+ * <p>For each MongoDB collections to be synchronized, if the corresponding 
Paimon table does not
+ * exist, this action will automatically create the table. Its schema will be 
derived from all
+ * specified MongoDB collections. If the Paimon table already exists, its 
schema will be compared
+ * against the schema of all specified MongoDB collections.
+ *
+ * <p>This action supports a limited number of schema changes. Currently, the 
framework can not drop
+ * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a 
new column. Currently
+ * supported schema changes includes:
+ *
+ * <ul>
+ *   <li>Adding columns.
+ * </ul>
+ *
+ * <p>To automatically synchronize new table, This action creates a single 
sink for all Paimon
+ * tables to be written. See {@link DatabaseSyncMode#COMBINED}.
+ */
+public class MongoDBSyncDatabaseAction extends ActionBase {
+
+    private final Configuration mongodbConfig;
+    private final String database;
+    private final String tablePrefix;
+    private final String tableSuffix;
+    private final Map<String, String> tableConfig;
+    @Nullable private final Pattern includingPattern;
+    @Nullable private final Pattern excludingPattern;
+    @Nullable private final String includingTables;
+
+    public MongoDBSyncDatabaseAction(
+            Map<String, String> mongodbConfig,
+            String warehouse,
+            String database,
+            Map<String, String> catalogConfig,
+            Map<String, String> tableConfig) {
+        this(
+                mongodbConfig,
+                warehouse,
+                database,
+                null,
+                null,
+                null,
+                null,
+                catalogConfig,
+                tableConfig);
+    }
+
+    public MongoDBSyncDatabaseAction(
+            Map<String, String> kafkaConfig,
+            String warehouse,
+            String database,
+            @Nullable String tablePrefix,
+            @Nullable String tableSuffix,
+            @Nullable String includingTables,
+            @Nullable String excludingTables,
+            Map<String, String> catalogConfig,
+            Map<String, String> tableConfig) {
+        super(warehouse, catalogConfig);
+        this.mongodbConfig = Configuration.fromMap(kafkaConfig);
+        this.database = database;
+        this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
+        this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
+        this.includingTables = includingTables == null ? ".*" : 
includingTables;
+        this.includingPattern = Pattern.compile(this.includingTables);
+        this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        this.tableConfig = tableConfig;
+    }
+
+    public void build(StreamExecutionEnvironment env) throws Exception {
+        boolean caseSensitive = catalog.caseSensitive();
+
+        if (!caseSensitive) {
+            validateCaseInsensitive();
+        }
+
+        catalog.createDatabase(database, true);
+        TableNameConverter tableNameConverter =
+                new TableNameConverter(caseSensitive, true, tablePrefix, 
tableSuffix);
+        List<Identifier> excludedTables = new ArrayList<>();
+
+        MongoDBSource<String> source =
+                MongoDBActionUtils.buildMongodbSource(
+                        mongodbConfig, buildTableList(excludedTables));
+
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
+        RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
+                new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+        Pattern includingPattern = this.includingPattern;
+        Pattern excludingPattern = this.excludingPattern;
+        parserFactory =
+                () ->
+                        new RichCdcMultiplexRecordEventParser(
+                                schemaBuilder, includingPattern, 
excludingPattern);
+        FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
+                new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+                        .withInput(
+                                env.fromSource(
+                                                source,
+                                                
WatermarkStrategy.noWatermarks(),
+                                                "MongoDB Source")
+                                        .flatMap(
+                                                new MongoDBRecordParser(
+                                                        false, 
tableNameConverter, mongodbConfig)))
+                        .withParserFactory(parserFactory)
+                        .withCatalogLoader(catalogLoader())
+                        .withDatabase(database)
+                        .withMode(DatabaseSyncMode.COMBINED);
+        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        if (sinkParallelism != null) {
+            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+        }
+        sinkBuilder.build();
+    }
+
+    private void validateCaseInsensitive() {
+        checkArgument(
+                database.equals(database.toLowerCase()),
+                String.format(
+                        "Database name [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        database));
+        checkArgument(
+                tablePrefix.equals(tablePrefix.toLowerCase()),
+                String.format(
+                        "Table prefix [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        tablePrefix));
+        checkArgument(
+                tableSuffix.equals(tableSuffix.toLowerCase()),
+                String.format(
+                        "Table suffix [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        tableSuffix));
+    }
+
+    private String buildTableList(List<Identifier> excludedTables) {
+        String separatorRex = "\\.";
+        // In COMBINED mode, we should consider both existed tables and 
possible newly added
+        // tables, so we should use regular expression to monitor all valid 
tables and exclude
+        // certain invalid tables
+
+        // The table list is built by template:
+        // (?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
+
+        // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
+        // is exactly equal to 'db.tbl'
+        // The including pattern databasePattern\\.(including_pattern1|...) 
can include tables
+        // whose qualified name matches one of the patterns
+
+        // a table can be monitored only when its name meets the including 
pattern and doesn't
+        // be excluded by excluding pattern at the same time
+        String includingPattern =
+                String.format(
+                        "%s%s(%s)",
+                        mongodbConfig.get(MongoDBSourceOptions.DATABASE),
+                        separatorRex,
+                        includingTables);
+        if (excludedTables.isEmpty()) {
+            return includingPattern;
+        }
+
+        String excludingPattern =
+                excludedTables.stream()
+                        .map(
+                                t ->
+                                        String.format(
+                                                "(^%s$)",
+                                                t.getDatabaseName()
+                                                        + separatorRex
+                                                        + t.getObjectName()))
+                        .collect(Collectors.joining("|"));
+        excludingPattern = "?!" + excludingPattern;
+        return String.format("(%s)(%s)", excludingPattern, includingPattern);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        env.execute(String.format("MongoDB-Paimon Database Sync: %s", 
database));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
new file mode 100644
index 000000000..2fb2e0bbc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionFactory;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link MongoDBSyncDatabaseAction}. */
+public class MongoDBSyncDatabaseActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "mongodb-sync-database";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        checkRequiredArgument(params, "warehouse");
+        checkRequiredArgument(params, "database");
+        checkRequiredArgument(params, "mongodb-conf");
+
+        String warehouse = params.get("warehouse");
+        String database = params.get("database");
+        String tablePrefix = params.get("table-prefix");
+        String tableSuffix = params.get("table-suffix");
+        String includingTables = params.get("including-tables");
+        String excludingTables = params.get("excluding-tables");
+
+        Map<String, String> mongodbConfigOption = optionalConfigMap(params, 
"mongodb-conf");
+        Map<String, String> catalogConfigOption = optionalConfigMap(params, 
"catalog-conf");
+        Map<String, String> tableConfigOption = optionalConfigMap(params, 
"table-conf");
+        return Optional.of(
+                new MongoDBSyncDatabaseAction(
+                        mongodbConfigOption,
+                        warehouse,
+                        database,
+                        tablePrefix,
+                        tableSuffix,
+                        includingTables,
+                        excludingTables,
+                        catalogConfigOption,
+                        tableConfigOption));
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"mongodb-sync-database\" creates a streaming job "
+                        + "with a Flink MongoDB CDC source and multiple Paimon 
table sinks "
+                        + "to synchronize a whole MongoDB database into one 
Paimon database.\n"
+                        + "Only MongoDB tables with a primary key that 
includes `_id` will be taken into consideration."
+                        + "Any MongoDB tables created after the commencement 
of the task will automatically be included.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  mongodb-sync-database --warehouse <warehouse-path> 
--database <database-name> "
+                        + "[--table-prefix <paimon-table-prefix>] "
+                        + "[--table-suffix <paimon-table-suffix>] "
+                        + "[--including-tables 
<mongodb-table-name|name-regular-expr>] "
+                        + "[--excluding-tables 
<mongodb-table-name|name-regular-expr>] "
+                        + "[--mongodb-conf <mongodb-cdc-source-conf> 
[--mongodb-conf <mongodb-cdc-source-conf> ...]] "
+                        + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
+                        + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
+        System.out.println();
+
+        System.out.println(
+                "--table-prefix is the prefix of all Paimon tables to be 
synchronized. For example, if you want all "
+                        + "synchronized tables to have \"ods_\" as prefix, you 
can specify `--table-prefix ods_`.");
+        System.out.println("The usage of --table-suffix is same as 
`--table-prefix`");
+        System.out.println();
+
+        System.out.println(
+                "--including-tables is used to specify which source tables are 
to be synchronized. "
+                        + "You must use '|' to separate multiple tables. 
Regular expression is supported.");
+        System.out.println(
+                "--excluding-tables is used to specify which source tables are 
not to be synchronized. "
+                        + "The usage is same as --including-tables.");
+        System.out.println(
+                "--excluding-tables has higher priority than 
--including-tables if you specified both.");
+        System.out.println();
+
+        System.out.println("MongoDB CDC source conf syntax:");
+        System.out.println("  key=value");
+        System.out.println(
+                "'hosts', 'username', 'password' and 'database' "
+                        + "are required configurations, others are optional. "
+                        + "Note that 'database' should be the exact name "
+                        + "of the MongoDB database you want to synchronize. "
+                        + "It can't be a regular expression.");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";);
+        System.out.println();
+
+        System.out.println("Paimon catalog and table sink conf syntax:");
+        System.out.println("  key=value");
+        System.out.println("All Paimon sink table will be applied the same set 
of configurations.");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://paimon.apache.org/docs/master/maintenance/configurations/";);
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  mongodb-sync-database \\\n"
+                        + "    --warehouse hdfs:///path/to/warehouse \\\n"
+                        + "    --database test_db \\\n"
+                        + "    --mongodb-conf hosts=127.0.0.1:27017 \\\n"
+                        + "    --mongodb-conf username=root \\\n"
+                        + "    --mongodb-conf password=123456 \\\n"
+                        + "    --mongodb-conf database=source_db \\\n"
+                        + "    --catalog-conf metastore=hive \\\n"
+                        + "    --catalog-conf uri=thrift://hive-metastore:9083 
\\\n"
+                        + "    --table-conf bucket=4 \\\n"
+                        + "    --table-conf changelog-producer=input \\\n"
+                        + "    --table-conf sink.parallelism=4");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
new file mode 100644
index 000000000..6283f521d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * An {@link Action} which synchronize one MongoDB collection into one Paimon 
table.
+ *
+ * <p>You should specify MongodbDB source topic in {@code mongodbConfig}. See 
<a
+ * 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document
+ * of flink-connectors</a> for detailed keys and values.
+ *
+ * <p>If the specified Paimon table does not exist, this action will 
automatically create the table.
+ * Its schema will be derived from all specified MonodbDB collection. If the 
Paimon table already
+ * exists, its schema will be compared against the schema of all specified 
MonodbDB collection.
+ *
+ * <p>This action supports a limited number of schema changes. Unsupported 
schema changes will be
+ * ignored. Currently supported schema changes includes:
+ *
+ * <ul>
+ *   <li>Adding columns.
+ * </ul>
+ */
+public class MongoDBSyncTableAction extends ActionBase {
+    public final Configuration mongodbConfig;
+    public final String database;
+    public final String table;
+    public final List<String> partitionKeys;
+    public final Map<String, String> tableConfig;
+
+    public MongoDBSyncTableAction(
+            Map<String, String> mongodbConfig,
+            String warehouse,
+            String database,
+            String table,
+            List<String> partitionKeys,
+            Map<String, String> catalogConfig,
+            Map<String, String> tableConfig) {
+        super(warehouse, catalogConfig);
+        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
+        this.database = database;
+        this.table = table;
+        this.partitionKeys = partitionKeys;
+        this.tableConfig = tableConfig;
+    }
+
+    public void build(StreamExecutionEnvironment env) throws Exception {
+        checkArgument(
+                mongodbConfig.contains(MongoDBSourceOptions.COLLECTION),
+                String.format(
+                        "mongodb-conf [%s] must be specified.",
+                        MongoDBSourceOptions.COLLECTION.key()));
+
+        String tableList =
+                mongodbConfig.get(MongoDBSourceOptions.DATABASE)
+                        + "\\."
+                        + mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+        MongoDBSource<String> source =
+                MongoDBActionUtils.buildMongodbSource(mongodbConfig, 
tableList);
+
+        boolean caseSensitive = catalog.caseSensitive();
+
+        if (!caseSensitive) {
+            validateCaseInsensitive();
+        }
+
+        MongodbSchema mongodbSchema = 
MongodbSchema.getMongodbSchema(mongodbConfig);
+        catalog.createDatabase(database, true);
+
+        Identifier identifier = new Identifier(database, table);
+        FileStoreTable table;
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
+                RichCdcMultiplexRecordEventParser::new;
+        Schema fromMongodb =
+                MongoDBActionUtils.buildPaimonSchema(
+                        mongodbSchema, partitionKeys, tableConfig, 
caseSensitive);
+        try {
+            table = (FileStoreTable) catalog.getTable(identifier);
+        } catch (Exception e) {
+            catalog.createTable(identifier, fromMongodb, false);
+            table = (FileStoreTable) catalog.getTable(identifier);
+        }
+
+        CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
+                new CdcSinkBuilder<RichCdcMultiplexRecord>()
+                        .withInput(
+                                env.fromSource(
+                                                source,
+                                                
WatermarkStrategy.noWatermarks(),
+                                                "MongoDB Source")
+                                        .flatMap(
+                                                new MongoDBRecordParser(
+                                                        caseSensitive, 
mongodbConfig)))
+                        .withParserFactory(parserFactory)
+                        .withTable(table)
+                        .withIdentifier(identifier)
+                        .withCatalogLoader(catalogLoader());
+        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        if (sinkParallelism != null) {
+            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+        }
+        sinkBuilder.build();
+    }
+
+    private void validateCaseInsensitive() {
+        checkArgument(
+                database.equals(database.toLowerCase()),
+                String.format(
+                        "Database name [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        database));
+        checkArgument(
+                table.equals(table.toLowerCase()),
+                String.format(
+                        "Collection prefix [%s] cannot contain upper case in 
case-insensitive catalog.",
+                        table));
+        for (String part : partitionKeys) {
+            checkArgument(
+                    part.equals(part.toLowerCase()),
+                    String.format(
+                            "Partition keys [%s] cannot contain upper case in 
case-insensitive catalog.",
+                            partitionKeys));
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        execute(env, String.format("MongoDB-Paimon Database Sync: %s", 
database));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
new file mode 100644
index 000000000..96249895a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionFactory;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Factory to create {@link MongoDBSyncTableAction}. */
+public class MongoDBSyncTableActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "mongodb-sync-table";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+
+        List<String> partitionKeys = Collections.emptyList();
+        if (params.has("partition-keys")) {
+            partitionKeys =
+                    Arrays.stream(params.get("partition-keys").split(","))
+                            .collect(Collectors.toList());
+        }
+
+        checkRequiredArgument(params, "mongodb-conf");
+
+        Map<String, String> mongodbConfig = optionalConfigMap(params, 
"mongodb-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
+        Map<String, String> tableConfig = optionalConfigMap(params, 
"table-conf");
+
+        return Optional.of(
+                new MongoDBSyncTableAction(
+                        mongodbConfig,
+                        tablePath.f0,
+                        tablePath.f1,
+                        tablePath.f2,
+                        partitionKeys,
+                        catalogConfig,
+                        tableConfig));
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"mongodb-sync-table\" creates a streaming job "
+                        + "with a Flink mongodb CDC source and a Paimon table 
sink to consume CDC events.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  mongodb-sync-table --warehouse <warehouse-path> --database 
<database-name> "
+                        + "--table <table-name> "
+                        + "[--partition-keys <partition-keys>] "
+                        + "[--mongodb-conf <mongodb-cdc-source-conf> 
[--mongodb-conf <mongodb-cdc-source-conf> ...]] "
+                        + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
+                        + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
+        System.out.println();
+
+        System.out.println("Partition keys syntax:");
+        System.out.println("  key1,key2,...");
+        System.out.println(
+                "If partition key is not defined and the specified Paimon 
table does not exist, "
+                        + "this action will automatically create an 
unpartitioned Paimon table.");
+        System.out.println();
+
+        System.out.println("mongodb CDC source conf syntax:");
+        System.out.println("  key=value");
+        System.out.println(
+                "'hosts', 'username', 'password', 'database' and 'collection' "
+                        + "are required configurations, others are optional.");
+        System.out.println(
+                "The 'mongodb-conf' introduces the 'schema.start.mode' 
parameter on top of the MongoDB CDC source configuration. 'schema.start.mode' 
provides two modes: 'dynamic' (default) and 'specified'."
+                        + "In 'dynamic' mode, MongoDB schema information is 
parsed at one level, which forms the basis for schema change evolution."
+                        + "In 'specified' mode, synchronization takes place 
according to specified criteria."
+                        + "This can be done by configuring 'field.name' to 
specify the synchronization fields and 'parser.path' to specify the JSON 
parsing path for those fields.");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";);
+        System.out.println();
+
+        System.out.println("Paimon catalog and table sink conf syntax:");
+        System.out.println("  key=value");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://paimon.apache.org/docs/master/maintenance/configurations/";);
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  mongodb-sync-table \\\n"
+                        + "    --warehouse hdfs:///path/to/warehouse \\\n"
+                        + "    --database test_db \\\n"
+                        + "    --table test_table \\\n"
+                        + "    --partition-keys pt \\\n"
+                        + "    --mongodb-conf hosts=127.0.0.1:27017 \\\n"
+                        + "    --mongodb-conf username=root \\\n"
+                        + "    --mongodb-conf password=123456 \\\n"
+                        + "    --mongodb-conf database=source_db \\\n"
+                        + "    --mongodb-conf collection='source_table' \\\n"
+                        + "    --catalog-conf metastore=hive \\\n"
+                        + "    --catalog-conf uri=thrift://hive-metastore:9083 
\\\n"
+                        + "    --table-conf bucket=4 \\\n"
+                        + "    --table-conf changelog-producer=input \\\n"
+                        + "    --table-conf sink.parallelism=4");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
new file mode 100644
index 000000000..80cde22df
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.types.DataType;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.configuration.Configuration;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
+import static 
org.apache.paimon.shade.guava30.com.google.common.collect.Lists.newArrayList;
+import static org.apache.paimon.types.DataTypes.STRING;
+
+/** mongodb schema. */
+public class MongodbSchema {
+
+    private final String databaseName;
+    private final String tableName;
+    private final Map<String, DataType> fields;
+    private final List<String> primaryKeys;
+
+    public MongodbSchema(
+            String databaseName,
+            String tableName,
+            Map<String, DataType> fields,
+            List<String> primaryKeys) {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.fields = fields;
+        this.primaryKeys = primaryKeys;
+    }
+
+    public String tableName() {
+        return tableName;
+    }
+
+    public String databaseName() {
+        return databaseName;
+    }
+
+    public Map<String, DataType> fields() {
+        return fields;
+    }
+
+    public List<String> primaryKeys() {
+        return primaryKeys;
+    }
+
+    public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
+
+        ModeEnum mode = 
ModeEnum.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
+        String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
+        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
+        String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+        switch (mode) {
+            case SPECIFIED:
+                String[] columnNames = 
mongodbConfig.get(FIELD_NAME).split(",");
+                Map<String, DataType> schemaFields =
+                        generateSchemaFields(Arrays.asList(columnNames));
+                return new MongodbSchema(
+                        databaseName, collectionName, schemaFields, 
newArrayList("_id"));
+            case DYNAMIC:
+                String url = String.format("mongodb://%s/%s", hosts, 
databaseName);
+                try (MongoClient mongoClient = MongoClients.create(url)) {
+                    MongoDatabase database = 
mongoClient.getDatabase(databaseName);
+                    MongoCollection<Document> collection = 
database.getCollection(collectionName);
+                    Document firstDocument = collection.find().first();
+                    return createMongodbSchema(
+                            databaseName, collectionName, 
getColumnNames(firstDocument));
+                }
+            default:
+                throw new RuntimeException();
+        }
+    }
+
+    private static List<String> getColumnNames(Document document) {
+        if (document != null) {
+            return new ArrayList<>(document.keySet());
+        }
+        return null;
+    }
+
+    private static Map<String, DataType> generateSchemaFields(List<String> 
columnNames) {
+        Map<String, DataType> schemaFields = new LinkedHashMap<>();
+
+        if (columnNames != null) {
+            for (String columnName : columnNames) {
+                schemaFields.put(columnName, STRING());
+            }
+        }
+
+        return schemaFields;
+    }
+
+    private static MongodbSchema createMongodbSchema(
+            String databaseName, String collectionName, List<String> 
columnNames) {
+        Map<String, DataType> schemaFields = generateSchemaFields(columnNames);
+        return new MongodbSchema(databaseName, collectionName, schemaFields, 
newArrayList("_id"));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof MongodbSchema)) {
+            return false;
+        }
+        MongodbSchema that = (MongodbSchema) o;
+        return databaseName.equals(that.databaseName)
+                && tableName.equals(that.tableName)
+                && fields.equals(that.fields)
+                && primaryKeys.equals(that.primaryKeys);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(databaseName, tableName, fields, primaryKeys);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
new file mode 100644
index 000000000..1304ade16
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb.strategy;
+
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** extract record implementation class 6.x>Mongodb Version>4.x. */
+public class Mongo4VersionStrategy implements MongoVersionStrategy {
+
+    private static final String FIELD_TYPE = "operationType";
+    private static final String FIELD_DATA = "fullDocument";
+    private static final String OP_UPDATE = "update";
+    private static final String OP_INSERT = "insert";
+    private static final String OP_REPLACE = "replace";
+
+    private final String databaseName;
+    private final String collection;
+    private final boolean caseSensitive;
+    private final Configuration mongodbConfig;
+
+    public Mongo4VersionStrategy(
+            String databaseName,
+            String collection,
+            boolean caseSensitive,
+            Configuration mongodbConfig) {
+        this.databaseName = databaseName;
+        this.collection = collection;
+        this.caseSensitive = caseSensitive;
+        this.mongodbConfig = mongodbConfig;
+    }
+
+    @Override
+    public List<RichCdcMultiplexRecord> extractRecords(JsonNode root)
+            throws JsonProcessingException {
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
+
+        String op = root.get(FIELD_TYPE).asText();
+        JsonNode fullDocument = root.get(FIELD_DATA);
+        // extract row kind and field values
+        switch (op) {
+            case OP_INSERT:
+                Map<String, String> insert =
+                        getExtractRow(fullDocument, paimonFieldTypes, 
caseSensitive, mongodbConfig);
+                records.add(
+                        new RichCdcMultiplexRecord(
+                                databaseName,
+                                collection,
+                                paimonFieldTypes,
+                                extractPrimaryKeys(),
+                                new CdcRecord(RowKind.INSERT, insert)));
+                break;
+            case OP_REPLACE:
+            case OP_UPDATE:
+                Map<String, String> after =
+                        getExtractRow(fullDocument, paimonFieldTypes, 
caseSensitive, mongodbConfig);
+                records.add(
+                        new RichCdcMultiplexRecord(
+                                databaseName,
+                                collection,
+                                paimonFieldTypes,
+                                extractPrimaryKeys(),
+                                new CdcRecord(RowKind.UPDATE_AFTER, after)));
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record type: 
" + op);
+        }
+        return records;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
new file mode 100644
index 000000000..6d19ed6e4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb.strategy;
+
+import org.apache.paimon.flink.action.cdc.mongodb.JsonParserUtils;
+import org.apache.paimon.flink.action.cdc.mongodb.ModeEnum;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.PARSER_PATH;
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Processing strategies for different mongodb versions. */
+public interface MongoVersionStrategy {
+
+    ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    List<RichCdcMultiplexRecord> extractRecords(JsonNode root) throws 
JsonProcessingException;
+
+    default List<String> extractPrimaryKeys() {
+        List<String> primaryKeys = new ArrayList<>();
+        primaryKeys.add("_id");
+        return primaryKeys;
+    }
+
+    default Map<String, String> extractRow(String record) {
+        return JsonParserUtils.extractMap(record);
+    }
+
+    default Map<String, String> keyCaseInsensitive(Map<String, String> origin) 
{
+        Map<String, String> keyCaseInsensitive = new HashMap<>();
+        for (Map.Entry<String, String> entry : origin.entrySet()) {
+            String fieldName = entry.getKey().toLowerCase();
+            checkArgument(
+                    !keyCaseInsensitive.containsKey(fieldName),
+                    "Duplicate key appears when converting map keys to 
case-insensitive form. Original map is:\n%s",
+                    origin);
+            keyCaseInsensitive.put(fieldName, entry.getValue());
+        }
+        return keyCaseInsensitive;
+    }
+
+    default Map<String, String> getExtractRow(
+            JsonNode jsonNode,
+            LinkedHashMap<String, DataType> paimonFieldTypes,
+            boolean caseSensitive,
+            Configuration mongodbConfig)
+            throws JsonProcessingException {
+        ModeEnum mode = 
ModeEnum.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
+        ObjectNode objectNode = (ObjectNode) 
OBJECT_MAPPER.readTree(jsonNode.asText());
+        JsonNode document = objectNode.set("_id", 
objectNode.get("_id").get("$oid"));
+        switch (mode) {
+            case SPECIFIED:
+                Map<String, String> specifiedRow =
+                        getSpecifiedRow(
+                                document.toString(),
+                                mongodbConfig.getString(PARSER_PATH),
+                                mongodbConfig.getString(FIELD_NAME),
+                                paimonFieldTypes);
+                return caseSensitive ? specifiedRow : 
keyCaseInsensitive(specifiedRow);
+            case DYNAMIC:
+                Map<String, String> dynamicRow =
+                        getDynamicRow(document.toString(), paimonFieldTypes);
+                return caseSensitive ? dynamicRow : 
keyCaseInsensitive(dynamicRow);
+            default:
+                throw new RuntimeException();
+        }
+    }
+
+    default Map<String, String> getDynamicRow(
+            String evaluate, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
+        Map<String, String> linkedHashMap = 
JsonParserUtils.extractMap(evaluate);
+        Set<String> keySet = linkedHashMap.keySet();
+        String[] columns = keySet.toArray(new String[0]);
+        for (String column : columns) {
+            paimonFieldTypes.put(column, DataTypes.STRING());
+        }
+        return extractRow(evaluate);
+    }
+
+    static Map<String, String> getSpecifiedRow(
+            String record,
+            String parsePath,
+            String fileName,
+            LinkedHashMap<String, DataType> paimonFieldTypes) {
+        Map<String, String> resultMap = new HashMap<>();
+        String[] columnNames = fileName.split(",");
+        String[] parseNames = parsePath.split(",");
+        for (int i = 0; i < parseNames.length; i++) {
+            paimonFieldTypes.put(columnNames[i], DataTypes.STRING());
+            String evaluate = JsonParserUtils.evaluate(record, "$." + 
parseNames[i]);
+            if (evaluate == null) {
+                resultMap.put(columnNames[i], "{}");
+            } else {
+                resultMap.put(columnNames[i], evaluate);
+            }
+        }
+        return resultMap;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index f2cfb71ce..2e67176dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -28,3 +28,5 @@ 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableActionFactory
 org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseActionFactory
 org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableActionFactory
 org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseActionFactory
+org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncTableActionFactory
+org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
new file mode 100644
index 000000000..d1db139e8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** UT cases for {@link JsonParserUtils}. */
+public class JsonParserUtilsTest {
+
+    @Test
+    public void testGetJsonObject() {
+        String json =
+                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": { \"totalResults\": 1, \"resultsPerPage\": 1 }, \"items\": [ { 
\"kind\": \"youtube#video\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\": 
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\": 
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\": 
\"29\"  [...]
+        String output = JsonParserUtils.evaluate(json, 
"$.pageInfo.totalResults");
+        assertEquals("1", output);
+    }
+
+    @Test
+    public void testNestedGetJsonObject() {
+        String json =
+                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": { \"pagehit\":{ \"kind\": \"youtube#video\" } ,\"totalResults\": 
1, \"resultsPerPage\": 1 }, \"items\": [ { \"kind\": \"youtube#video\", 
\"etag\": \"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", 
\"id\": \"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", 
\"likeCount\": \"79\", \"dislikeCount\": \"11\", \"favo [...]
+        String output = JsonParserUtils.evaluate(json, "$.pageInfo.pagehit");
+        assertEquals("{\"kind\":\"youtube#video\"}", output);
+    }
+
+    @Test
+    public void testStringWhenNotJson() {
+        String json =
+                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": \"page\", \"items\": [ { \"kind\": \"youtube#video\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\": 
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\": 
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\": 
\"29\" }, \"topicDetails\": { \"topicIds\": [ [...]
+        String output = JsonParserUtils.evaluate(json, 
"$.pageinfo.test_field");
+        assertNull(output);
+    }
+
+    @Test
+    public void testStringWhenJson() {
+        String json =
+                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": \"page\", \"items\": [ { \"kind\": \"youtube#video\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\": 
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\": 
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\": 
\"29\" }, \"topicDetails\": { \"topicIds\": [ [...]
+        String output = JsonParserUtils.evaluate(json, "$.pageInfo");
+        assertEquals("page", output);
+    }
+
+    @Test
+    public void testJsonArray() {
+        String json =
+                "{\"country\":\"Switzerland\",\"languages\": [ \"Italian\" 
],\"religions\": \"christian\"}";
+        String output = JsonParserUtils.evaluate(json, "$.languages[0]");
+        assertEquals("Italian", output);
+    }
+
+    @Test
+    public void testJsonMap() {
+        String json =
+                
"{\"country\":\"Switzerland\",\"languages\":\"Italian\",\"religions\": { \"f\": 
\"v\", \"n\":null} }";
+        String output = JsonParserUtils.evaluate(json, "$.religions.f");
+        assertEquals("v", output);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
new file mode 100644
index 000000000..77bd19f63
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for {@link org.apache.paimon.flink.action.Action}s related 
to MongoDB. */
+public abstract class MongoDBActionITCaseBase extends ActionITCaseBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBActionITCaseBase.class);
+    protected static MongoClient client;
+    public static final MongoDBContainer MONGODB_CONTAINER =
+            new MongoDBContainer("mongo:6.0.6")
+                    .withSharding()
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    protected StreamExecutionEnvironment env;
+    protected StreamTableEnvironment tEnv;
+
+    @BeforeEach
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
+    }
+
+    @BeforeAll
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        // 
MONGODB_CONTAINER.setPortBindings(Collections.singletonList("27017:27017"));
+        Startables.deepStart(Stream.of(MONGODB_CONTAINER)).join();
+        LOG.info("Containers are started.");
+        MongoClientSettings settings =
+                MongoClientSettings.builder()
+                        .applyConnectionString(
+                                new 
ConnectionString(MONGODB_CONTAINER.getConnectionString()))
+                        .build();
+        client = MongoClients.create(settings);
+    }
+
+    @AfterAll
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        MONGODB_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    protected Map<String, String> getBasicMongoDBConfig() {
+        Map<String, String> config = new HashMap<>();
+        config.put("hosts", MONGODB_CONTAINER.getHostAndPort());
+        return config;
+    }
+
+    protected String createRecordsToMongoDB(String fileName, String content) {
+        return 
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, content);
+    }
+
+    protected static String writeRecordsToMongoDB(String fileName, String 
dbName, String content) {
+        return 
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, dbName, 
content);
+    }
+
+    protected void waitJobRunning(JobClient client) throws Exception {
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+    }
+
+    protected void waitForResult(
+            List<String> expected, FileStoreTable table, RowType rowType, 
List<String> primaryKeys)
+            throws Exception {
+        assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
+
+        // wait for table schema to become our expected schema
+        while (true) {
+            if (rowType.getFieldCount() == table.schema().fields().size()) {
+                int cnt = 0;
+                for (int i = 0; i < table.schema().fields().size(); i++) {
+                    DataField field = table.schema().fields().get(i);
+                    boolean sameName = 
field.name().equals(rowType.getFieldNames().get(i));
+                    boolean sameType = 
field.type().equals(rowType.getFieldTypes().get(i));
+                    if (sameName && sameType) {
+                        cnt++;
+                    }
+                }
+                if (cnt == rowType.getFieldCount()) {
+                    break;
+                }
+            }
+            table = table.copyWithLatestSchema();
+            Thread.sleep(1000);
+        }
+
+        // wait for data to become expected
+        List<String> sortedExpected = new ArrayList<>(expected);
+        Collections.sort(sortedExpected);
+        while (true) {
+            ReadBuilder readBuilder = table.newReadBuilder();
+            TableScan.Plan plan = readBuilder.newScan().plan();
+            List<String> result =
+                    getResult(
+                            readBuilder.newRead(),
+                            plan == null ? Collections.emptyList() : 
plan.splits(),
+                            rowType);
+            List<String> sortedActual = new ArrayList<>(result);
+            Collections.sort(sortedActual);
+            LOG.info("Expected sortedExpected is:{}", sortedExpected);
+            LOG.info("Actual sortedActual is:{}", sortedExpected);
+            if (sortedExpected.equals(sortedActual)) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+    }
+
+    protected FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
+        Identifier identifier = Identifier.create(database, tableName);
+        return (FileStoreTable) catalog().getTable(identifier);
+    }
+
+    protected Map<String, String> getBasicTableConfig() {
+        Map<String, String> config = new HashMap<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        config.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        config.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1));
+        return config;
+    }
+
+    protected void waitTablesCreated(String... tables) throws Exception {
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        List<String> existed = catalog().listTables(database);
+                        return existed.containsAll(Arrays.asList(tables));
+                    } catch (Catalog.DatabaseNotExistException e) {
+                        throw new RuntimeException(e);
+                    }
+                },
+                Duration.ofMinutes(5),
+                Duration.ofMillis(100),
+                "Failed to wait tables to be created in 5 seconds.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
new file mode 100644
index 000000000..10eaf9d3f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Random;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+
+/** Container for testing MongoDB >= 5.0.3. */
+public class MongoDBContainer extends 
org.testcontainers.containers.MongoDBContainer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBContainer.class);
+
+    private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)//.*$");
+
+    public static final int MONGODB_PORT = 27017;
+
+    public static final String FLINK_USER = "flinkuser";
+
+    public static final String FLINK_USER_PASSWORD = 
"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
+
+    public MongoDBContainer(String imageName) {
+        super(imageName);
+    }
+
+    @Override
+    protected void containerIsStarted(InspectContainerResponse containerInfo, 
boolean reused) {
+        super.containerIsStarted(containerInfo, reused);
+
+        final String setupFilePath = "mongodb/docker/setup.js";
+        final URL setupFile = 
MongoDBContainer.class.getClassLoader().getResource(setupFilePath);
+
+        assertNotNull("Cannot locate " + setupFilePath, setupFile);
+        try {
+            String createUserCommand =
+                    Files.readAllLines(Paths.get(setupFile.toURI())).stream()
+                            .filter(x -> StringUtils.isNotBlank(x) && 
!x.trim().startsWith("//"))
+                            .map(
+                                    x -> {
+                                        final Matcher m = 
COMMENT_PATTERN.matcher(x);
+                                        return m.matches() ? m.group(1) : x;
+                                    })
+                            .collect(Collectors.joining(" "));
+            ExecResult execResult =
+                    execInContainer(
+                            "mongosh",
+                            "--eval",
+                            "use admin",
+                            "--eval",
+                            createUserCommand,
+                            "--eval",
+                            "console.log('Flink test user created.\\n');");
+            LOG.info(execResult.getStdout());
+            if (execResult.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        "Execute mongo command failed " + 
execResult.getStderr());
+            }
+            this.waitingFor(Wait.forLogMessage("Flink test user created.\\s", 
1));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public MongoDBContainer withSharding() {
+        return (MongoDBContainer) super.withSharding();
+    }
+
+    @Override
+    public MongoDBContainer withLogConsumer(Consumer<OutputFrame> consumer) {
+        return (MongoDBContainer) super.withLogConsumer(consumer);
+    }
+
+    @Override
+    public MongoDBContainer withNetwork(Network network) {
+        return (MongoDBContainer) super.withNetwork(network);
+    }
+
+    @Override
+    public MongoDBContainer withNetworkAliases(String... aliases) {
+        return (MongoDBContainer) super.withNetworkAliases(aliases);
+    }
+
+    @Override
+    public MongoDBContainer withStartupTimeout(Duration timeout) {
+        return (MongoDBContainer) super.withStartupTimeout(timeout);
+    }
+
+    public void executeCommand(String command) {
+        try {
+            LOG.info("Executing mongo command: {}", command);
+            ExecResult execResult = execInContainer("mongosh", "--eval", 
command);
+            LOG.info(execResult.getStdout());
+            if (execResult.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        "Execute mongo command failed " + 
execResult.getStderr());
+            }
+        } catch (InterruptedException | IOException e) {
+            throw new IllegalStateException("Execute mongo command failed", e);
+        }
+    }
+
+    public String executeCommandInDatabase(String command, String 
databaseName) {
+        try {
+            executeCommand(String.format("db = db.getSiblingDB('%s');\n", 
databaseName) + command);
+            return databaseName;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Executes a mongo command in separate database. */
+    public String executeCommandInSeparateDatabase(String command, String 
baseName) {
+        return executeCommandInDatabase(
+                command, baseName + "_" + Integer.toUnsignedString(new 
Random().nextInt(), 36));
+    }
+
+    /** Executes a mongo command file in separate database. */
+    public String executeCommandFileInSeparateDatabase(
+            String fileNameIgnoreSuffix, String content) {
+        return executeCommandFileInDatabase(
+                fileNameIgnoreSuffix,
+                fileNameIgnoreSuffix + "_" + Integer.toUnsignedString(new 
Random().nextInt(), 36),
+                content);
+    }
+
+    public String executeCommandFileInSeparateDatabase(
+            String fileNameIgnoreSuffix, String dbName, String content) {
+        return executeCommandFileInDatabase(fileNameIgnoreSuffix, dbName, 
content);
+    }
+
+    /** Executes a mongo command file, specify a database name. */
+    public String executeCommandFileInDatabase(
+            String fileNameIgnoreSuffix, String databaseName, String content) {
+        final String dbName = databaseName != null ? databaseName : 
fileNameIgnoreSuffix;
+        final String ddlFile = String.format("mongodb/%s/%s.js", content, 
fileNameIgnoreSuffix);
+        final URL ddlTestFile = 
MongoDBContainer.class.getClassLoader().getResource(ddlFile);
+        assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+
+        try {
+            // use database;
+            String command0 = String.format("db = db.getSiblingDB('%s');\n", 
dbName);
+            String command1 =
+                    Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
+                            .filter(x -> StringUtils.isNotBlank(x) && 
!x.trim().startsWith("//"))
+                            .map(
+                                    x -> {
+                                        final Matcher m = 
COMMENT_PATTERN.matcher(x);
+                                        return m.matches() ? m.group(1) : x;
+                                    })
+                            .collect(Collectors.joining("\n"));
+
+            executeCommand(command0 + command1);
+
+            return dbName;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getConnectionString() {
+        return String.format(
+                "mongodb://%s:%d", getContainerIpAddress(), 
getMappedPort(MONGODB_PORT));
+    }
+
+    public String getHostAndPort() {
+        return String.format("%s:%s", getContainerIpAddress(), 
getMappedPort(MONGODB_PORT));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
new file mode 100644
index 000000000..699357b54
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** IT cases for {@link MongoDBSyncDatabaseAction}. */
+public class MongoDBSyncDatabaseActionITCase extends MongoDBActionITCaseBase {
+
+    @Test
+    @Timeout(120)
+    public void testSchemaEvolution() throws Exception {
+        writeRecordsToMongoDB("test-data-1", database, "database");
+        writeRecordsToMongoDB("test-data-2", database, "database");
+
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", database);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MongoDBSyncDatabaseAction action =
+                new MongoDBSyncDatabaseAction(
+                        mongodbConfig, warehouse, database, 
Collections.emptyMap(), tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        testSchemaEvolutionImpl();
+    }
+
+    private void testSchemaEvolutionImpl() throws Exception {
+        waitTablesCreated("t1", "t2");
+
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description", "weight"});
+        List<String> primaryKeys1 = Collections.singletonList("_id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter, 3.14]",
+                        "+I[100000000000000000000102, car battery, 12V car 
battery, 8.1]",
+                        "+I[100000000000000000000103, 12-pack drill bits, 
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "address", 
"phone_number"});
+        List<String> primaryKeys2 = Collections.singletonList("_id");
+        expected =
+                Arrays.asList(
+                        "+I[100000000000000000000101, user_1, Shanghai, 
123563291234]",
+                        "+I[100000000000000000000102, user_2, Beijing, 
1234347891234]",
+                        "+I[100000000000000000000103, user_3, Hangzhou, 
1235567891234]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        writeRecordsToMongoDB("test-data-3", database, "database");
+
+        expected =
+                Arrays.asList(
+                        "+U[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350]",
+                        "+U[100000000000000000000102, car battery, 
High-performance car battery, 8.1]",
+                        "+U[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        writeRecordsToMongoDB("test-data-4", database, "database");
+
+        expected =
+                Arrays.asList(
+                        "+U[100000000000000000000101, user_1, Guangzhou, 
123563291234]",
+                        "+U[100000000000000000000102, user_2, Beijing, 
1234546591234]",
+                        "+U[100000000000000000000103, user_3, Nanjing, 
1235567891234]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
new file mode 100644
index 000000000..9063234dc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** IT cases for {@link MongoDBSyncTableActionITCase}. */
+public class MongoDBSyncTableActionITCase extends MongoDBActionITCaseBase {
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("inventory-1");
+    }
+
+    private void runSingleTableSchemaEvolution(String sourceDir) throws 
Exception {
+        // ---------- Write the Document into MongoDB -------------------
+        String inventory = createRecordsToMongoDB(sourceDir, "table");
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", inventory);
+        mongodbConfig.put("collection", "products");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MongoDBSyncTableAction action =
+                new MongoDBSyncTableAction(
+                        mongodbConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        waitJobRunning(client);
+
+        testSchemaEvolutionImpl(inventory);
+    }
+
+    private void testSchemaEvolutionImpl(String dbName) throws Exception {
+        waitTablesCreated(tableName);
+        FileStoreTable table = getFileStoreTable(tableName);
+        List<String> primaryKeys = Collections.singletonList("_id");
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description", "weight"});
+        List<String> expected =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter, 3.14]",
+                        "+I[100000000000000000000102, car battery, 12V car 
battery, 8.1]",
+                        "+I[100000000000000000000103, 12-pack drill bits, 
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        writeRecordsToMongoDB("inventory-2", dbName, "table");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description", "weight"});
+        expected =
+                Arrays.asList(
+                        "+U[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350]",
+                        "+U[100000000000000000000102, car battery, 
High-performance car battery, 8.1]",
+                        "+U[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        writeRecordsToMongoDB("inventory-3", dbName, "table");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {
+                            "_id", "name", "description", "weight", "hobby", 
"age", "address"
+                        });
+        expected =
+                Arrays.asList(
+                        "+U[100000000000000000000102, car battery, 
High-performance car battery, 8.1, NULL, 18, NULL]",
+                        "+U[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8, NULL, NULL, I live in Sanlitun]",
+                        "+U[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350, playing computer games, NULL, NULL]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSpecifiedMode() throws Exception {
+        String inventory = createRecordsToMongoDB("inventory-1", "table");
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", inventory);
+        mongodbConfig.put("collection", "products");
+        mongodbConfig.put("field.name", "_id,name,description");
+        mongodbConfig.put("parser.path", "_id,name,description");
+        mongodbConfig.put("schema.start.mode", "specified");
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        MongoDBSyncTableAction action =
+                new MongoDBSyncTableAction(
+                        mongodbConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        waitJobRunning(client);
+        waitTablesCreated(tableName);
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(), DataTypes.STRING(), 
DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description"});
+        List<String> primaryKeys = Collections.singletonList("_id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter]",
+                        "+I[100000000000000000000102, car battery, 12V car 
battery]",
+                        "+I[100000000000000000000103, 12-pack drill bits, 
12-pack of drill bits with sizes ranging from #40 to #3]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-1.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-1.js
new file mode 100644
index 000000000..695e54fd7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-1.js
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t1').insertMany([
+    {
+        "_id": ObjectId("100000000000000000000101"),
+        "name": "scooter",
+        "description": "Small 2-wheel scooter",
+        "weight": 3.14
+    },
+    {
+        "_id": ObjectId("100000000000000000000102"),
+        "name": "car battery",
+        "description": "12V car battery",
+        "weight": 8.1
+    },
+    {
+        "_id": ObjectId("100000000000000000000103"),
+        "name": "12-pack drill bits",
+        "description": "12-pack of drill bits with sizes ranging from #40 to 
#3",
+        "weight": 0.8
+    }
+]);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-2.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-2.js
new file mode 100644
index 000000000..be3583995
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-2.js
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t2').insertMany([
+    { "_id": ObjectId("100000000000000000000101"), "name": "user_1",  
"address": "Shanghai", "phone_number": "123563291234" },
+    { "_id": ObjectId("100000000000000000000102"), "name": "user_2",  
"address": "Beijing", "phone_number": "1234347891234" },
+    { "_id": ObjectId("100000000000000000000103"), "name": "user_3",  
"address": "Hangzhou", "phone_number": "1235567891234" }
+]);
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-3.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-3.js
new file mode 100644
index 000000000..c85b85219
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-3.js
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t1').updateOne({"name": "scooter"}, {$set: {"weight": 
"350"}});
+db.getCollection('t1').updateOne({"name": "car battery"}, {$set: 
{"description": "High-performance car battery"}});
+db.getCollection('t1').updateOne({"name": "12-pack drill bits"}, {$set: 
{"description": "Set of 12 professional-grade drill bits"}});
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-4.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-4.js
new file mode 100644
index 000000000..2849f56af
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-4.js
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t2').updateOne({"name": "user_1"}, {$set: {"address": 
"Guangzhou"}});
+db.getCollection('t2').updateOne({"name": "user_2"}, {$set: {"phone_number": 
"1234546591234"}});
+db.getCollection('t2').updateOne({"name": "user_3"}, {$set: {"address": 
"Nanjing"}});
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/docker/random.key 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/docker/random.key
new file mode 100644
index 000000000..2abaf9a41
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/docker/random.key
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  -- this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+XK8G9pNKhEPp/BlsKT7pHEc5i0oCpvNVZMALH5pD/6EHSuMzuyO1FpoeDwmWHXl0
++Gp+VOI89Xp7E6eqop+fFHtoM3Mnk2oTiI/442GvS0xISPTwFVY9nO3MfO8VcPVx
+J3JCAb80GeXD5x55eAOi7NqXzpjk0OKqfPEwIn1lrjlkL2m5vq6kaKEd93i1+bMh
+3LRd1jLbgwWWxqYVV92BTQNnJin+G1er7Y2FzLpeFIKqyy+I22qIE2XIC7yj3wSw
+kxwKsPN5LjFsfVeKpf169R0KgBg4Nm0qlllVUGNKuEjaVoLOEBOJgoPnhC6L2avc
+/iDeunZDlDDgYG6t6aJXJelP+W1uXp4JQj1j18Scn0lrvgWxdAVrAtK6ftxqutHc
+RQBt6Ap63zojTraulm3aeo/w/yz0zjyYjxQ5t8cojIM/7TaNLe2GfVxwhqitUPL1
+ct2YFXWwX1H/+8E7yTsnquKqe6+r0aGQqxS5x+wFMsDun/1mxv7jgjwzZc1rEk8H
+DGdhnQ7MFPOE6Bp03zGpa6B6K4I5uDgUUeOC7zmAN63cPEumuuCjPVK42sMt5wwR
+NPJyL4+sWHa9vb2sBJ1dk3thQ+wwz856BZ9ILgeMUutQgasSwctlI7t3rhM+BGYy
++naEhKWN9/cIDXtl3ZMhNWJIh/MqbluYazQ/97MZHeWc9CJXFU6yUrnJOdE0VvQd
+tROQNDuEB0Tq9ITxSYpZTY49+1CQp5E14GIc8frieWPvcbNVknriFquQfsW/tMvk
+V2Aj8sBYE+sW9sGQJlyfRrhTSN6aBG1em7ZkOAgcx2/5ftaEZTwBxNnJR9VZDYEi
+CDbobs3hIX3qhS6J9YbTEPFF2L6MMTL3ADgS44cWtmlYQrb2HJT0YLmdCzk4lSa6
+yWYLorduRtblgGo6v/nn7y41gn/l/aRdcDUsii/LgMco4ZPSRm0HixD8oA3agX9/
+23M5UVNCBO4/RKFOnjWM/2tN1xjeQrS2Hn6j3BtoTOl6k4ho
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/docker/setup.js 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/docker/setup.js
new file mode 100644
index 000000000..cbe54d967
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/docker/setup.js
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+// to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+// However, in this database we'll grant flink user with privileges:
+//
+// 'flinkuser' - all privileges required by the snapshot reader AND stream 
reader (used for testing)
+//
+
+//use admin;
+db.createRole(
+    {
+        role: "flinkrole",
+        privileges: [{
+            // Grant privileges on All Non-System Collections in All Databases
+            resource: { db: "", collection: "" },
+            actions: [ "splitVector", "listDatabases", "listCollections", 
"collStats", "find", "changeStream" ]
+        }],
+        roles: [
+            { role: 'read', db: 'config' }
+        ]
+    }
+);
+
+db.createUser(
+ {
+   user: 'flinkuser',
+   pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;',
+   roles: [
+      { role: 'flinkrole', db: 'admin' }
+   ]
+ }
+);
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-1.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-1.js
new file mode 100644
index 000000000..dbd43b87c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-1.js
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('products').insertMany([
+    {
+        "_id": ObjectId("100000000000000000000101"),
+        "name": "scooter",
+        "description": "Small 2-wheel scooter",
+        "weight": 3.14
+    },
+    {
+        "_id": ObjectId("100000000000000000000102"),
+        "name": "car battery",
+        "description": "12V car battery",
+        "weight": 8.1
+    },
+    {
+        "_id": ObjectId("100000000000000000000103"),
+        "name": "12-pack drill bits",
+        "description": "12-pack of drill bits with sizes ranging from #40 to 
#3",
+        "weight": 0.8
+    }
+]);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-2.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-2.js
new file mode 100644
index 000000000..45cb5d392
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-2.js
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('products').updateOne({"name": "scooter"}, {$set: {"weight": 
"350"}});
+db.getCollection('products').updateOne({"name": "car battery"}, {$set: 
{"description": "High-performance car battery"}});
+db.getCollection('products').updateOne({"name": "12-pack drill bits"}, {$set: 
{"description": "Set of 12 professional-grade drill bits"}});
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-3.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-3.js
new file mode 100644
index 000000000..d3f766f8d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-3.js
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('products').updateOne(
+    {_id: ObjectId("100000000000000000000101")},
+    {$set: {hobby: "playing computer games"}}
+);
+
+db.getCollection('products').updateOne(
+    {_id: ObjectId("100000000000000000000102")},
+    {$set: {age: "18"}}
+);
+
+db.getCollection('products').updateOne(
+    {_id: ObjectId("100000000000000000000103")},
+    {$set: {address: "I live in Sanlitun"}}
+);
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-4.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-4.js
new file mode 100644
index 000000000..65f4ded3f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/inventory-4.js
@@ -0,0 +1,16 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('products').updateOne({"name": "car battery"}, {$set: 
{"description": "High-performance car battery"}});


Reply via email to