This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eb545eb1 [flink] mongodb into the lake related code optimized (#1861)
7eb545eb1 is described below

commit 7eb545eb18185502050ab5ff0f8e6c1db9bfde73
Author: monster <[email protected]>
AuthorDate: Mon Aug 28 22:30:20 2023 +0800

    [flink] mongodb into the lake related code optimized (#1861)
---
 LICENSE                                            |   3 +
 docs/content/how-to/cdc-ingestion.md               |  57 ++++-
 .../shortcodes/generated/mongodb_functions.html    |  95 ++++++++
 .../shortcodes/generated/mongodb_operator.html     |  65 ++++++
 .../shortcodes/generated/mongodb_path_example.html |  97 ++++++++
 .../generated/mongodb_sync_database.html           |  65 ++++++
 .../shortcodes/generated/mongodb_sync_table.html   |  57 +++++
 .../org/apache/paimon/utils/JsonSerdeUtil.java     |  25 ++-
 paimon-flink/paimon-flink-common/pom.xml           |   7 +
 .../flink/action/cdc/mongodb/JsonParserUtils.java  | 247 ---------------------
 .../action/cdc/mongodb/MongoDBActionUtils.java     | 120 +++++-----
 .../action/cdc/mongodb/MongoDBRecordParser.java    |  58 +++--
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |  27 +--
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |  43 ++--
 .../flink/action/cdc/mongodb/MongodbSchema.java    |  83 ++++---
 .../{ModeEnum.java => SchemaAcquisitionMode.java}  |   8 +-
 .../mongodb/strategy/Mongo4VersionStrategy.java    |  92 ++++++--
 .../cdc/mongodb/strategy/MongoVersionStrategy.java |  91 +++++---
 .../action/cdc/mongodb/JsonParserUtilsTest.java    |  76 -------
 .../flink/action/cdc/mongodb/MongoDBContainer.java |  11 -
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   |   2 +-
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  |   2 +-
 22 files changed, 799 insertions(+), 532 deletions(-)

diff --git a/LICENSE b/LICENSE
index 74613fb9d..75d02b53a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -222,6 +222,9 @@ from https://github.com/aws/aws-sdk-java version 1.12.319
 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 from https://ververica.github.io/flink-cdc-connectors/ version 2.3.0
 
+paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
+from https://ververica.github.io/flink-cdc-connectors/ version 2.4.0
+
 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 from https://github.com/apache/doris-flink-connector/ version 1.4.0
 
diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index b7158e335..670827d3b 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -449,6 +449,9 @@ To use this feature through `flink run`, run the following 
shell command.
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
 ```
+
+{{< generated/mongodb_sync_table >}}
+
 Here are a few points to take note of:
 
 1. The "mongodb-conf" introduces the "schema.start.mode" parameter on top of 
the MongoDB CDC source configuration."schema.start.mode" provides two modes: 
"dynamic" (default) and "specified".
@@ -459,6 +462,56 @@ The difference between the two is that the "specify" mode 
requires the user to e
 Dynamic mode, on the other hand, ensures that Paimon and MongoDB always keep 
the top-level fields consistent, eliminating the need to focus on specific 
fields.
 Further processing of the data table is required when using values from nested 
fields.
 
+{{< generated/mongodb_operator >}}
+
+
+Functions can be invoked at the tail end of a path - the input to a function 
is the output of the path expression. The function output is dictated by the 
function itself.
+
+{{< generated/mongodb_functions) >}}
+
+Path Examples
+```json
+{
+    "store": {
+        "book": [
+            {
+                "category": "reference",
+                "author": "Nigel Rees",
+                "title": "Sayings of the Century",
+                "price": 8.95
+            },
+            {
+                "category": "fiction",
+                "author": "Evelyn Waugh",
+                "title": "Sword of Honour",
+                "price": 12.99
+            },
+            {
+                "category": "fiction",
+                "author": "Herman Melville",
+                "title": "Moby Dick",
+                "isbn": "0-553-21311-3",
+                "price": 8.99
+            },
+            {
+                "category": "fiction",
+                "author": "J. R. R. Tolkien",
+                "title": "The Lord of the Rings",
+                "isbn": "0-395-19395-8",
+                "price": 22.99
+            }
+        ],
+        "bicycle": {
+            "color": "red",
+            "price": 19.95
+        }
+    },
+    "expensive": 10
+}
+```
+
+{{< generated/mongodb_path_example) >}}
+
 2. The synchronized table is required to have its primary key set as `_id`. 
 This is because MongoDB's change events are recorded before updates in 
messages. 
 Consequently, we can only convert them into Flink's UPSERT change log stream. 
@@ -509,7 +562,7 @@ Example 2: Synchronize collection into a Paimon table 
according to the specified
     --mongodb-conf collection=source_table1 \
     --mongodb-conf schema.start.mode=specified \
     --mongodb-conf field.name=_id,name,description \
-    --mongodb-conf parser.path=_id,name,description \
+    --mongodb-conf parser.path=$._id,$.name,$.description \
     --catalog-conf metastore=hive \
     --catalog-conf uri=thrift://hive-metastore:9083 \
     --table-conf bucket=4 \
@@ -538,6 +591,8 @@ To use this feature through `flink run`, run the following 
shell command.
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
 ```
 
+{{< generated/mongodb_sync_database >}}
+
 All collections to be synchronized need to set _id as the primary key.
 For each MongoDB collection to be synchronized, if the corresponding Paimon 
table does not exist, this action will automatically create the table. 
 Its schema will be derived from all specified MongoDB collection. If the 
Paimon table already exists, its schema will be compared against the schema of 
all specified MongoDB collection.
diff --git a/docs/layouts/shortcodes/generated/mongodb_functions.html 
b/docs/layouts/shortcodes/generated/mongodb_functions.html
new file mode 100644
index 000000000..a10f41d16
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_functions.html
@@ -0,0 +1,95 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Function</th>
+        <th class="text-left" style="width: 70%">Description</th>
+        <th class="text-left" style="width: 15%">Output type</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>min()</h5></td>
+        <td>Provides the min value of an array of numbers.</td>
+        <td>Double</td>
+    </tr>
+    <tr>
+        <td><h5>max()</h5></td>
+        <td>Provides the max value of an array of numbers.</td>
+        <td>Double</td>
+    </tr>
+    <tr>
+        <td><h5>avg()</h5></td>
+        <td>Provides the average value of an array of numbers.</td>
+        <td>Double</td>
+    </tr>
+    <tr>
+        <td><h5>stddev()</h5></td>
+        <td>Provides the standard deviation value of an array of numbers</td>
+        <td>Double</td>
+    </tr>
+    <tr>
+        <td><h5>length()</h5></td>
+        <td>Provides the length of an array</td>
+        <td>Integer</td>
+    </tr>
+    <tr>
+        <td><h5>sum()</h5></td>
+        <td>Provides the sum value of an array of numbers.</td>
+        <td>Double</td>
+    </tr>
+    <tr>
+        <td><h5>keys()</h5></td>
+        <td>Provides the property keys (An alternative for terminal tilde 
~)</td>
+        <td>Set</td>
+    </tr>
+    <tr>
+        <td><h5>concat(X)</h5></td>
+        <td>Provides a concatinated version of the path output with a new 
item.</td>
+        <td>like input</td>
+    </tr>
+    <tr>
+        <td><h5>append(X)</h5></td>
+        <td>add an item to the json path output array</td>
+        <td>like input</td>
+    </tr>
+    <tr>
+        <td><h5>append(X)</h5></td>
+        <td>add an item to the json path output array</td>
+        <td>like input</td>
+    </tr>
+    <tr>
+        <td><h5>first()</h5></td>
+        <td>Provides the first item of an array</td>
+        <td>Depends on the array</td>
+    </tr>
+    <tr>
+        <td><h5>last()</h5></td>
+        <td>Provides the last item of an array</td>
+        <td>Depends on the array</td>
+    </tr>
+    <tr>
+        <td><h5>index(X)</h5></td>
+        <td>Provides the item of an array of index: X, if the X is negative, 
take from backwards</td>
+        <td>Depends on the array</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_operator.html 
b/docs/layouts/shortcodes/generated/mongodb_operator.html
new file mode 100644
index 000000000..2f4f99197
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_operator.html
@@ -0,0 +1,65 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Operator</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>$</h5></td>
+        <td>The root element to query. This starts all path expressions.</td>
+    </tr>
+    <tr>
+        <td><h5>@</h5></td>
+        <td>The current node being processed by a filter predicate.</td>
+    </tr>
+    <tr>
+        <td><h5>*</h5></td>
+        <td>Wildcard. Available anywhere a name or numeric are required.</td>
+    </tr>
+    <tr>
+        <td><h5>..</h5></td>
+        <td>Deep scan. Available anywhere a name is required.</td>
+    </tr>
+    <tr>
+        <td><h5>.</h5></td>
+        <td>Dot-notated child.</td>
+    </tr>
+    <tr>
+        <td><h5>['{name}' (, '{name}')]</h5></td>
+        <td>Bracket-notated child or children.</td>
+    </tr>
+    <tr>
+        <td><h5>[{number} (, {number})]</h5></td>
+        <td>Bracket-notated child or children.</td>
+    </tr>
+    <tr>
+        <td><h5>[start:end]</h5></td>
+        <td>Array index or indexes.</td>
+    </tr>
+    <tr>
+        <td><h5>[?({expression})]</h5></td>
+        <td>Filter expression. Expression must evaluate to a boolean 
value.</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_path_example.html 
b/docs/layouts/shortcodes/generated/mongodb_path_example.html
new file mode 100644
index 000000000..fb1c6a874
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_path_example.html
@@ -0,0 +1,97 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">JsonPath</th>
+        <th class="text-left" style="width: 85%">Result</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>$.store.book[*].author</h5></td>
+        <td>Provides the min value of an array of numbers.</td>
+    </tr>
+    <tr>
+        <td><h5>$..author</h5></td>
+        <td>All authors.</td>
+    </tr>
+    <tr>
+        <td><h5>$.store.*</h5></td>
+        <td>All things, both books and bicycles.</td>
+    </tr>
+    <tr>
+        <td><h5>$.store..price</h5></td>
+        <td>Provides the standard deviation value of an array of numbers.</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[2]</h5></td>
+        <td>The third book.</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[-2]</h5></td>
+        <td>The second to last book.</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[0,1]</h5></td>
+        <td>The first two books.</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[:2]</h5></td>
+        <td>All books from index 0 (inclusive) until index 2 (exclusive).</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[1:2]</h5></td>
+        <td>All books from index 1 (inclusive) until index 2 (exclusive)</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[-2:]</h5></td>
+        <td>Last two books</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[2:]</h5></td>
+        <td>All books from index 2 (inclusive) to last</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[?(@.isbn)]</h5></td>
+        <td>All books with an ISBN number</td>
+    </tr>
+    <tr>
+        <td><h5>$.store.book[?(@.price < 10)]</h5></td>
+        <td>All books in store cheaper than 10</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[?(@.price <= $['expensive'])]</h5></td>
+        <td>All books in store that are not "expensive"</td>
+    </tr>
+    <tr>
+        <td><h5>$..book[?(@.author =~ /.*REES/i)]</h5></td>
+        <td>All books matching regex (ignore case)</td>
+    </tr>
+    <tr>
+        <td><h5>$..*</h5></td>
+        <td>Give me every thing</td>
+    </tr>
+    <tr>
+        <td><h5>$..book.length()</h5></td>
+        <td>The number of books</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html 
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
new file mode 100644
index 000000000..a7c9ae94f
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -0,0 +1,65 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--table-prefix</h5></td>
+        <td>The prefix of all Paimon tables to be synchronized. For example, 
if you want all synchronized tables to have "ods_" as prefix, you can specify 
"--table-prefix ods_".</td>
+    </tr>
+    <tr>
+        <td><h5>--table-suffix</h5></td>
+        <td>The suffix of all Paimon tables to be synchronized. The usage is 
same as "--table-prefix".</td>
+    </tr>
+    <tr>
+        <td><h5>--including-tables</h5></td>
+        <td>It is used to specify which source tables are to be synchronized. 
You must use '|' to separate multiple tables.Because '|' is a special 
character, a comma is required, for example: 'a|b|c'.Regular expression is 
supported, for example, specifying "--including-tables test|paimon.*" means to 
synchronize table 'test' and all tables start with 'paimon'.</td>
+    </tr>
+    <tr>
+        <td><h5>--excluding-tables</h5></td>
+        <td>It is used to specify which source tables are not to be 
synchronized. The usage is same as "--including-tables". "--excluding-tables" 
has higher priority than "--including-tables" if you specified both.</td>
+    </tr>
+    <tr>
+        <td><h5>--mongodb-conf</h5></td>
+        <td>The configuration for Flink CDC MongoDB sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--catalog-conf</h5></td>
+        <td>The configuration for Paimon catalog. Each configuration should be 
specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a 
complete list of catalog configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--table-conf</h5></td>
+        <td>The configuration for Paimon table sink. Each configuration should 
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a 
complete list of table configurations.</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_table.html 
b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
new file mode 100644
index 000000000..440580eb7
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
@@ -0,0 +1,57 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--table</h5></td>
+        <td>The Paimon table name.</td>
+    </tr>
+    <tr>
+        <td><h5>--partition-keys</h5></td>
+        <td>The partition keys for Paimon table. If there are multiple 
partition keys, connect them with comma, for example "dt,hh,mm".</td>
+    </tr>
+    <tr>
+        <td><h5>--mongodb-conf</h5></td>
+        <td>The configuration for Flink CDC MongoDB sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--catalog-conf</h5></td>
+        <td>The configuration for Paimon catalog. Each configuration should be 
specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a 
complete list of catalog configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--table-conf</h5></td>
+        <td>The configuration for Paimon table sink. Each configuration should 
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a 
complete list of table configurations.</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 0ffd1fb32..4ea553249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -27,6 +27,7 @@ import org.apache.paimon.types.DataTypeJsonParser;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.Module;
@@ -38,7 +39,10 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** A utility class that provide abilities for JSON serialization and 
deserialization. */
 public class JsonSerdeUtil {
@@ -54,6 +58,25 @@ public class JsonSerdeUtil {
         OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule());
     }
 
+    public static <V> LinkedHashMap<String, V> parseJsonMap(String jsonString, 
Class<V> valueType) {
+        try {
+            LinkedHashMap<String, Object> originalMap =
+                    OBJECT_MAPPER_INSTANCE.readValue(
+                            jsonString, new 
TypeReference<LinkedHashMap<String, Object>>() {});
+            return originalMap.entrySet().stream()
+                    .collect(
+                            Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    entry ->
+                                            
OBJECT_MAPPER_INSTANCE.convertValue(
+                                                    entry.getValue(), 
valueType),
+                                    (oldValue, newValue) -> oldValue,
+                                    LinkedHashMap::new));
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Error parsing JSON string", e);
+        }
+    }
+
     /**
      * Retrieves a specific node from the given root node and casts it to the 
specified type.
      *
@@ -61,7 +84,7 @@ public class JsonSerdeUtil {
      * @param root The root node from which the specific node is to be 
retrieved.
      * @param fieldName The name of the field to retrieve.
      * @param clazz The class of the node to be returned.
-     * @return The node casted to the specified type.
+     * @return The node cast to the specified type.
      * @throws IllegalArgumentException if the node is not present or if it's 
not of the expected
      *     type.
      */
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 53ff1344d..200e3026d 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -40,6 +40,7 @@ under the License.
         <geometry.version>2.2.0</geometry.version>
         <avro.version>1.11.1</avro.version>
         <mongodb.testcontainers.version>1.18.3</mongodb.testcontainers.version>
+        <json-path.version>2.8.0</json-path.version>
     </properties>
 
     <dependencies>
@@ -174,6 +175,12 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>${json-path.version}</version>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
deleted file mode 100644
index 61340542d..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.mongodb;
-
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.type.MapType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * specified, and return json string of the extracted json object. It will 
return null if the input
- * json string is invalid. A limited version of JSONPath supported: $ : Root 
object . : Child
- * operator [] : Subscript operator for array * : Wildcard for [] Syntax not 
supported that's worth
- * noticing: '' : Zero length string as key .. : Recursive descent &amp;#064; 
: Current
- * object/element () : Script expression ?() : Filter (script) expression. [,] 
: Union operator
- * [start:end:step] : array slice operator.
- */
-@SuppressWarnings("unchecked")
-public class JsonParserUtils implements Serializable {
-
-    private static final Pattern patternKey = 
Pattern.compile("^([a-zA-Z0-9_\\-:\\s]+).*");
-    private static final Pattern patternIndex = 
Pattern.compile("\\[([0-9]+|\\*)]");
-    private static final JsonFactory JSON_FACTORY = new JsonFactory();
-
-    static {
-        // Allows for unescaped ASCII control characters in JSON values
-        JSON_FACTORY.enable(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
-    }
-
-    private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
-    private static final MapType MAP_TYPE =
-            MAPPER.getTypeFactory().constructMapType(Map.class, String.class, 
Object.class);
-
-    // An LRU cache using a linked hash map
-    static class HashCache<K, V> extends LinkedHashMap<K, V> {
-
-        private static final int CACHE_SIZE = 16;
-        private static final int INIT_SIZE = 32;
-        private static final float LOAD_FACTOR = 0.6f;
-
-        HashCache() {
-            super(INIT_SIZE, LOAD_FACTOR);
-        }
-
-        private static final long serialVersionUID = 1;
-
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-            return size() > CACHE_SIZE;
-        }
-    }
-
-    static Map<String, Object> extractObjectCache = new HashCache<>();
-    static Map<String, String[]> pathExprCache = new HashCache<>();
-    static Map<String, ArrayList<String>> indexListCache = new HashCache<>();
-    static Map<String, String> mKeyGroupCache = new HashCache<>();
-    static Map<String, Boolean> mKeyMatchesCache = new HashCache<>();
-
-    public static String evaluate(String jsonString, String pathString) {
-
-        if (jsonString == null
-                || jsonString.equals("")
-                || pathString == null
-                || pathString.equals("")) {
-            return null;
-        }
-
-        // Cache pathExpr
-        String[] pathExpr = pathExprCache.computeIfAbsent(pathString, s -> 
s.split("\\.", -1));
-
-        if (!pathExpr[0].equalsIgnoreCase("$")) {
-            return null;
-        }
-        // Cache extractObject
-        Object extractObject = extractObjectCache.get(jsonString);
-        if (extractObject == null) {
-            try {
-                extractObject = MAPPER.readValue(jsonString, MAP_TYPE);
-            } catch (Exception e) {
-                return null;
-            }
-            extractObjectCache.put(jsonString, extractObject);
-        }
-        for (int i = 1; i < pathExpr.length; i++) {
-            if (extractObject == null) {
-                return null;
-            }
-            extractObject = extract(extractObject, pathExpr[i]);
-        }
-        if (extractObject instanceof Map || extractObject instanceof List) {
-            try {
-                return MAPPER.writeValueAsString(extractObject);
-            } catch (Exception e) {
-                return null;
-            }
-        } else if (extractObject != null) {
-            return extractObject.toString();
-        } else {
-            return null;
-        }
-    }
-
-    private static Object extract(Object json, String path) {
-
-        // Cache patternkey.matcher(path).matches()
-        Matcher mKey = null;
-        Boolean mKeyMatches = mKeyMatchesCache.get(path);
-        if (mKeyMatches == null) {
-            mKey = patternKey.matcher(path);
-            mKeyMatches = mKey.matches() ? Boolean.TRUE : Boolean.FALSE;
-            mKeyMatchesCache.put(path, mKeyMatches);
-        }
-        if (!mKeyMatches) {
-            return null;
-        }
-
-        // Cache mkey.group(1)
-        String mKeyGroup1 = mKeyGroupCache.get(path);
-        if (mKeyGroup1 == null) {
-            if (mKey == null) {
-                mKey = patternKey.matcher(path);
-            }
-            mKeyGroup1 = mKey.group(1);
-            mKeyGroupCache.put(path, mKeyGroup1);
-        }
-        json = extract_json_key(json, mKeyGroup1);
-
-        // Cache indexList
-        ArrayList<String> indexList = indexListCache.get(path);
-        if (indexList == null) {
-            Matcher mIndex = patternIndex.matcher(path);
-            indexList = new ArrayList<>();
-            while (mIndex.find()) {
-                indexList.add(mIndex.group(1));
-            }
-            indexListCache.put(path, indexList);
-        }
-
-        if (indexList.size() > 0) {
-            json = extract_json_withIndex(json, indexList);
-        }
-
-        return json;
-    }
-
-    private static Object extract_json_withIndex(Object json, 
ArrayList<String> indexList) {
-        List<Object> jsonList = new ArrayList<>();
-        jsonList.add(json);
-        for (String index : indexList) {
-            List<Object> tmpJsonList = new ArrayList<>();
-            if (index.equalsIgnoreCase("*")) {
-                for (Object array : jsonList) {
-                    if (array instanceof List) {
-                        tmpJsonList.addAll((List<Object>) array);
-                    }
-                }
-                jsonList = tmpJsonList;
-            } else {
-                for (int i = 0; i < (jsonList).size(); i++) {
-                    Object array = jsonList.get(i);
-                    int indexValue = Integer.parseInt(index);
-                    if (!(array instanceof List)) {
-                        continue;
-                    }
-                    if (indexValue >= ((List<Object>) array).size()) {
-                        return null;
-                    }
-                    tmpJsonList.add(((List<Object>) array).get(indexValue));
-                    jsonList = tmpJsonList;
-                }
-            }
-        }
-        if (jsonList.isEmpty()) {
-            return null;
-        }
-        return (jsonList.size() > 1) ? new ArrayList<>(jsonList) : 
jsonList.get(0);
-    }
-
-    private static Object extract_json_key(Object json, String path) {
-        if (json instanceof List) {
-            List<Object> jsonArray = new ArrayList<>();
-            for (int i = 0; i < ((List<Object>) json).size(); i++) {
-                Object jsonElem = ((List<Object>) json).get(i);
-                Object jsonObj;
-                if (jsonElem instanceof Map) {
-                    jsonObj = ((Map<String, Object>) jsonElem).get(path);
-                } else {
-                    continue;
-                }
-                if (jsonObj instanceof List) {
-                    jsonArray.addAll((List<Object>) jsonObj);
-                } else if (jsonObj != null) {
-                    jsonArray.add(jsonObj);
-                }
-            }
-            return (jsonArray.size() == 0) ? null : jsonArray;
-        } else if (json instanceof Map) {
-            return ((Map<String, Object>) json).get(path);
-        } else {
-            return null;
-        }
-    }
-
-    public static LinkedHashMap<String, String> extractMap(String jsonString) {
-        try {
-            LinkedHashMap<String, ?> originalMap = 
MAPPER.readValue(jsonString, MAP_TYPE);
-            LinkedHashMap<String, String> stringMap = new LinkedHashMap<>();
-
-            originalMap.forEach(
-                    (key, value) -> {
-                        String stringValue = Objects.toString(value, null);
-                        stringMap.put(key.toLowerCase(), stringValue);
-                    });
-
-            return stringMap;
-
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index 1b062a8a1..5870db179 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -23,9 +23,6 @@ import org.apache.paimon.types.DataType;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoDatabase;
 import com.ververica.cdc.connectors.base.options.SourceOptions;
 import com.ververica.cdc.connectors.base.options.StartupOptions;
 import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
@@ -36,38 +33,59 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.bson.Document;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Utils for MongoDB Action. */
+/**
+ * Utility class for MongoDB-related actions.
+ *
+ * <p>This class provides a set of utility methods to facilitate the creation 
and configuration of
+ * MongoDB sources, as well as the construction of Paimon schemas based on 
MongoDB schemas. It also
+ * includes methods for validating MongoDB configurations and fetching MongoDB 
version information.
+ *
+ * <p>Key functionalities include:
+ *
+ * <ul>
+ *   <li>Building MongoDB sources with various configurations.
+ *   <li>Constructing Paimon schemas based on MongoDB schemas.
+ *   <li>Validating essential MongoDB configurations.
+ * </ul>
+ *
+ * <p>Note: This utility class is designed to be used in conjunction with 
Flink and Paimon
+ * integrations.
+ */
 public class MongoDBActionUtils {
 
+    private static final String INITIAL_MODE = "initial";
+    private static final String LATEST_OFFSET_MODE = "latest-offset";
+    private static final String TIMESTAMP_MODE = "timestamp";
+    private static final String PRIMARY_KEY = "_id";
+
     public static final ConfigOption<String> FIELD_NAME =
             ConfigOptions.key("field.name")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription(
-                            "Set the field names to be synchronized in the  
`specified` mode.");
+                    .withDescription("Field names to synchronize when in 
`specified` mode.");
 
     public static final ConfigOption<String> PARSER_PATH =
             ConfigOptions.key("parser.path")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Configure the JSON parsing path for synchronizing 
field values in the `specified` mode.");
+                            "JSON parsing path for field synchronization in 
`specified` mode.");
 
     public static final ConfigOption<String> START_MODE =
             ConfigOptions.key("schema.start.mode")
                     .stringType()
                     .defaultValue("dynamic")
-                    .withDescription("Can choose between the `dynamic` and 
`specified` modes.");
+                    .withDescription("Mode selection: `dynamic` or 
`specified`.");
 
     static MongoDBSource<String> buildMongodbSource(Configuration 
mongodbConfig, String tableList) {
         validateMongodbConfig(mongodbConfig);
@@ -100,14 +118,20 @@ public class MongoDBActionUtils {
                 .collectionList(tableList);
 
         String startupMode = 
mongodbConfig.get(SourceOptions.SCAN_STARTUP_MODE);
-        if ("initial".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(StartupOptions.initial());
-        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(StartupOptions.latest());
-        } else if ("timestamp".equalsIgnoreCase(startupMode)) {
-            sourceBuilder.startupOptions(
-                    StartupOptions.timestamp(
-                            
mongodbConfig.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+        switch (startupMode.toLowerCase()) {
+            case INITIAL_MODE:
+                sourceBuilder.startupOptions(StartupOptions.initial());
+                break;
+            case LATEST_OFFSET_MODE:
+                sourceBuilder.startupOptions(StartupOptions.latest());
+                break;
+            case TIMESTAMP_MODE:
+                sourceBuilder.startupOptions(
+                        StartupOptions.timestamp(
+                                
mongodbConfig.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported startup mode: 
" + startupMode);
         }
 
         Map<String, Object> customConverterConfigs = new HashMap<>();
@@ -136,48 +160,34 @@ public class MongoDBActionUtils {
             List<String> specifiedPartitionKeys,
             Map<String, String> paimonConfig,
             boolean caseSensitive) {
-        Schema.Builder builder = Schema.newBuilder();
-        builder.options(paimonConfig);
-
-        Map<String, DataType> mongodbFields;
-        if (caseSensitive) {
-            mongodbFields = mongodbSchema.fields();
-        } else {
-            mongodbFields = new LinkedHashMap<>();
-            for (Map.Entry<String, DataType> entry : 
mongodbSchema.fields().entrySet()) {
-                String fieldName = entry.getKey();
-                checkArgument(
-                        !mongodbFields.containsKey(fieldName.toLowerCase()),
-                        String.format(
-                                "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
-                                fieldName, mongodbSchema.tableName()));
-                mongodbFields.put(fieldName.toLowerCase(), entry.getValue());
-            }
-        }
 
-        for (Map.Entry<String, DataType> entry : mongodbFields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue());
-        }
-
-        builder.primaryKey(Lists.newArrayList("_id"));
-
-        if (specifiedPartitionKeys.size() > 0) {
+        Schema.Builder builder = Schema.newBuilder().options(paimonConfig);
+
+        Map<String, DataType> mongodbFields =
+                caseSensitive
+                        ? mongodbSchema.fields()
+                        : mongodbSchema.fields().entrySet().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                entry -> 
entry.getKey().toLowerCase(),
+                                                Map.Entry::getValue,
+                                                (existing, replacement) -> {
+                                                    throw new 
IllegalArgumentException(
+                                                            String.format(
+                                                                    "Duplicate 
key '%s' in table '%s' appears when converting fields map keys to 
case-insensitive form.",
+                                                                    existing,
+                                                                    
mongodbSchema.tableName()));
+                                                },
+                                                LinkedHashMap::new));
+
+        mongodbFields.forEach(builder::column);
+
+        builder.primaryKey(Lists.newArrayList(PRIMARY_KEY));
+
+        if (!specifiedPartitionKeys.isEmpty()) {
             builder.partitionKeys(specifiedPartitionKeys);
         }
 
         return builder.build();
     }
-
-    public static int getMongoDBVersion(Configuration mongodbConfig) {
-        String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
-        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
-
-        String url = String.format("mongodb://%s/%s", hosts, databaseName);
-        try (MongoClient mongoClient = MongoClients.create(url)) {
-            MongoDatabase database = mongoClient.getDatabase(databaseName);
-            Document buildInfo = database.runCommand(new Document("buildInfo", 
1));
-            String[] split = ((String) buildInfo.get("version")).split("\\.");
-            return Integer.parseInt(split[0]);
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index b0c06a627..a8aad81af 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -30,19 +30,38 @@ import 
org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-/** Convert MongoDB Debezium JSON string to list of {@link 
RichCdcMultiplexRecord}s. */
+/**
+ * A parser for MongoDB Debezium JSON strings, converting them into a list of 
{@link
+ * RichCdcMultiplexRecord}s.
+ *
+ * <p>This parser is designed to process and transform incoming MongoDB 
Debezium JSON records into a
+ * more structured format suitable for further processing. It takes into 
account the version of
+ * MongoDB and applies the appropriate strategy for extracting records.
+ *
+ * <p>Key features include:
+ *
+ * <ul>
+ *   <li>Support for case-sensitive and case-insensitive field names.
+ *   <li>Integration with a configurable table name converter.
+ *   <li>Ability to work with different MongoDB version strategies (e.g., 
Mongo4, Mongo6).
+ * </ul>
+ *
+ * <p>Note: This parser is primarily intended for use in Flink streaming 
applications that process
+ * MongoDB CDC data.
+ */
 public class MongoDBRecordParser implements FlatMapFunction<String, 
RichCdcMultiplexRecord> {
 
     private static final String FIELD_DATABASE = "db";
     private static final String FIELD_TABLE = "coll";
-    private final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String FIELD_NAMESPACE = "ns";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private final boolean caseSensitive;
     private final TableNameConverter tableNameConverter;
     private final Configuration mongodbConfig;
     private JsonNode root;
 
     public MongoDBRecordParser(boolean caseSensitive, Configuration 
mongodbConfig) {
-
         this(caseSensitive, new TableNameConverter(caseSensitive), 
mongodbConfig);
     }
 
@@ -57,26 +76,31 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
 
     @Override
     public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) 
throws Exception {
-        root = objectMapper.readValue(value, JsonNode.class);
+        root = OBJECT_MAPPER.readValue(value, JsonNode.class);
         String databaseName = extractString(FIELD_DATABASE);
         String collection = 
tableNameConverter.convert(extractString(FIELD_TABLE));
         MongoVersionStrategy versionStrategy =
-                new Mongo4VersionStrategy(databaseName, collection, 
caseSensitive, mongodbConfig);
-
-        // TODO:Upgrade mongodb cdc to 2.5 and enable scanFullChangelog 
capability
-        // [Full Changelog]
-        // 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html
-        //        if (mongodbVersion >= 6) {
-        //            versionStrategy = new 
Mongo6VersionStrategy(databaseName, collection,
-        // caseSensitive);
-        //        } else {
-        //            versionStrategy = new 
Mongo4VersionStrategy(databaseName, collection,
-        // caseSensitive);
-        //        }
+                VersionStrategyFactory.create(
+                        databaseName, collection, caseSensitive, 
mongodbConfig);
         versionStrategy.extractRecords(root).forEach(out::collect);
     }
 
     private String extractString(String key) {
-        return root.get("ns").get(key).asText();
+        return root.get(FIELD_NAMESPACE).get(key).asText();
+    }
+
+    private static class VersionStrategyFactory {
+        static MongoVersionStrategy create(
+                String databaseName,
+                String collection,
+                boolean caseSensitive,
+                Configuration mongodbConfig) {
+            // TODO: When MongoDB CDC is upgraded to 2.5, uncomment the 
version check logic
+            // if (mongodbVersion >= 6) {
+            //     return new Mongo6VersionStrategy(databaseName, collection, 
caseSensitive);
+            // }
+            return new Mongo4VersionStrategy(
+                    databaseName, collection, caseSensitive, mongodbConfig);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 82c879d89..e7d613512 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
@@ -47,27 +46,23 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
- * An {@link Action} which synchronize the whole MongoDB database into one 
Paimon database.
+ * An action class responsible for synchronizing MongoDB databases with a 
target system.
  *
- * <p>You should specify MongoDB source database in {@code mongodbConfig}. See 
<a
- * 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document
- * of flink-cdc-connectors</a> for detailed keys and values.
+ * <p>This class provides functionality to read data from a MongoDB source, 
process it, and then
+ * synchronize it with a target system. It supports various configurations, 
including table
+ * prefixes, suffixes, and inclusion/exclusion patterns.
  *
- * <p>For each MongoDB collections to be synchronized, if the corresponding 
Paimon table does not
- * exist, this action will automatically create the table. Its schema will be 
derived from all
- * specified MongoDB collections. If the Paimon table already exists, its 
schema will be compared
- * against the schema of all specified MongoDB collections.
- *
- * <p>This action supports a limited number of schema changes. Currently, the 
framework can not drop
- * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a 
new column. Currently
- * supported schema changes includes:
+ * <p>Key features include:
  *
  * <ul>
- *   <li>Adding columns.
+ *   <li>Support for case-sensitive and case-insensitive database and table 
names.
+ *   <li>Configurable table name conversion with prefixes and suffixes.
+ *   <li>Ability to include or exclude specific tables using regular 
expressions.
+ *   <li>Integration with Flink's streaming environment for data processing.
  * </ul>
  *
- * <p>To automatically synchronize new table, This action creates a single 
sink for all Paimon
- * tables to be written. See {@link DatabaseSyncMode#COMBINED}.
+ * <p>Note: This action is primarily intended for use in Flink streaming 
applications that
+ * synchronize MongoDB data with other systems.
  */
 public class MongoDBSyncDatabaseAction extends ActionBase {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 6283f521d..3aaad32b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -41,22 +40,23 @@ import java.util.Map;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
- * An {@link Action} which synchronize one MongoDB collection into one Paimon 
table.
+ * Represents an action to synchronize a specific MongoDB table with a target 
system.
  *
- * <p>You should specify MongodbDB source topic in {@code mongodbConfig}. See 
<a
- * 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document
- * of flink-connectors</a> for detailed keys and values.
- *
- * <p>If the specified Paimon table does not exist, this action will 
automatically create the table.
- * Its schema will be derived from all specified MonodbDB collection. If the 
Paimon table already
- * exists, its schema will be compared against the schema of all specified 
MonodbDB collection.
- *
- * <p>This action supports a limited number of schema changes. Unsupported 
schema changes will be
- * ignored. Currently supported schema changes includes:
+ * <p>This action is responsible for:
  *
  * <ul>
- *   <li>Adding columns.
+ *   <li>Validating the provided MongoDB configuration.
+ *   <li>Checking and ensuring the existence of the target database and table.
+ *   <li>Setting up the necessary Flink streaming environment for data 
synchronization.
+ *   <li>Handling case sensitivity considerations for database and table names.
  * </ul>
+ *
+ * <p>Usage:
+ *
+ * <pre>
+ * MongoDBSyncTableAction action = new MongoDBSyncTableAction(...);
+ * action.run();
+ * </pre>
  */
 public class MongoDBSyncTableAction extends ActionBase {
     public final Configuration mongodbConfig;
@@ -106,18 +106,21 @@ public class MongoDBSyncTableAction extends ActionBase {
 
         Identifier identifier = new Identifier(database, table);
         FileStoreTable table;
-        EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
-                RichCdcMultiplexRecordEventParser::new;
-        Schema fromMongodb =
-                MongoDBActionUtils.buildPaimonSchema(
-                        mongodbSchema, partitionKeys, tableConfig, 
caseSensitive);
-        try {
+
+        // Check if table exists before trying to get or create it
+        if (catalog.tableExists(identifier)) {
             table = (FileStoreTable) catalog.getTable(identifier);
-        } catch (Exception e) {
+        } else {
+            Schema fromMongodb =
+                    MongoDBActionUtils.buildPaimonSchema(
+                            mongodbSchema, partitionKeys, tableConfig, 
caseSensitive);
             catalog.createTable(identifier, fromMongodb, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
 
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
+                RichCdcMultiplexRecordEventParser::new;
+
         CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
                 new CdcSinkBuilder<RichCdcMultiplexRecord>()
                         .withInput(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
index 80cde22df..8c70ba4c1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
@@ -30,6 +31,7 @@ import org.bson.Document;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,12 +39,22 @@ import java.util.Objects;
 
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
-import static 
org.apache.paimon.shade.guava30.com.google.common.collect.Lists.newArrayList;
-import static org.apache.paimon.types.DataTypes.STRING;
 
-/** mongodb schema. */
+/**
+ * Represents the schema of a MongoDB collection.
+ *
+ * <p>This class provides methods to retrieve and manage the schema details of 
a MongoDB collection,
+ * including the database name, table (collection) name, fields, and primary 
keys. The schema can be
+ * acquired in two modes: SPECIFIED and DYNAMIC. In the SPECIFIED mode, the 
schema details are
+ * provided explicitly, while in the DYNAMIC mode, the schema is inferred from 
the first document in
+ * the collection.
+ *
+ * <p>The class also provides utility methods to generate schema fields and 
create a new MongoDB
+ * schema instance.
+ */
 public class MongodbSchema {
 
+    private static final String ID_FIELD = "_id";
     private final String databaseName;
     private final String tableName;
     private final Map<String, DataType> fields;
@@ -76,55 +88,60 @@ public class MongodbSchema {
     }
 
     public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
-
-        ModeEnum mode = 
ModeEnum.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
-        String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
-        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
-        String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+        SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
         switch (mode) {
             case SPECIFIED:
-                String[] columnNames = 
mongodbConfig.get(FIELD_NAME).split(",");
-                Map<String, DataType> schemaFields =
-                        generateSchemaFields(Arrays.asList(columnNames));
-                return new MongodbSchema(
-                        databaseName, collectionName, schemaFields, 
newArrayList("_id"));
+                return createSchemaFromSpecifiedConfig(mongodbConfig);
             case DYNAMIC:
-                String url = String.format("mongodb://%s/%s", hosts, 
databaseName);
-                try (MongoClient mongoClient = MongoClients.create(url)) {
-                    MongoDatabase database = 
mongoClient.getDatabase(databaseName);
-                    MongoCollection<Document> collection = 
database.getCollection(collectionName);
-                    Document firstDocument = collection.find().first();
-                    return createMongodbSchema(
-                            databaseName, collectionName, 
getColumnNames(firstDocument));
-                }
+                return createSchemaFromDynamicConfig(mongodbConfig);
             default:
-                throw new RuntimeException();
+                throw new IllegalArgumentException("Unsupported schema 
acquisition mode: " + mode);
         }
     }
 
-    private static List<String> getColumnNames(Document document) {
-        if (document != null) {
-            return new ArrayList<>(document.keySet());
+    private static SchemaAcquisitionMode getModeFromConfig(Configuration 
mongodbConfig) {
+        return 
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
+    }
+
+    private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration 
mongodbConfig) {
+        String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
+        Map<String, DataType> schemaFields = 
generateSchemaFields(Arrays.asList(columnNames));
+        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
+        String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+        return new MongodbSchema(
+                databaseName, collectionName, schemaFields, 
Collections.singletonList(ID_FIELD));
+    }
+
+    private static MongodbSchema createSchemaFromDynamicConfig(Configuration 
mongodbConfig) {
+        String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
+        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
+        String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+        String url = String.format("mongodb://%s/%s", hosts, databaseName);
+        try (MongoClient mongoClient = MongoClients.create(url)) {
+            MongoDatabase database = mongoClient.getDatabase(databaseName);
+            MongoCollection<Document> collection = 
database.getCollection(collectionName);
+            Document firstDocument = collection.find().first();
+            return createMongodbSchema(databaseName, collectionName, 
getColumnNames(firstDocument));
         }
-        return null;
+    }
+
+    private static List<String> getColumnNames(Document document) {
+        return document != null ? new ArrayList<>(document.keySet()) : 
Collections.emptyList();
     }
 
     private static Map<String, DataType> generateSchemaFields(List<String> 
columnNames) {
         Map<String, DataType> schemaFields = new LinkedHashMap<>();
-
-        if (columnNames != null) {
-            for (String columnName : columnNames) {
-                schemaFields.put(columnName, STRING());
-            }
+        for (String columnName : columnNames) {
+            schemaFields.put(columnName, DataTypes.STRING());
         }
-
         return schemaFields;
     }
 
     private static MongodbSchema createMongodbSchema(
             String databaseName, String collectionName, List<String> 
columnNames) {
         Map<String, DataType> schemaFields = generateSchemaFields(columnNames);
-        return new MongodbSchema(databaseName, collectionName, schemaFields, 
newArrayList("_id"));
+        return new MongodbSchema(
+                databaseName, collectionName, schemaFields, 
Collections.singletonList(ID_FIELD));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/SchemaAcquisitionMode.java
similarity index 77%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/SchemaAcquisitionMode.java
index 926999fb1..195eaa962 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/SchemaAcquisitionMode.java
@@ -18,8 +18,12 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
-/** schema acquisition mode. */
-public enum ModeEnum {
+/** Enum representing the modes of schema acquisition. */
+public enum SchemaAcquisitionMode {
+
+    /** Represents a mode where the schema is specified explicitly. */
     SPECIFIED,
+
+    /** Represents a mode where the schema is determined dynamically based on 
the data. */
     DYNAMIC
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 1304ade16..22887109a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -33,7 +33,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** extract record implementation class 6.x>Mongodb Version>4.x. */
+/**
+ * Implementation class for extracting records from MongoDB versions greater 
than 4.x and less than
+ * 6.x.
+ */
 public class Mongo4VersionStrategy implements MongoVersionStrategy {
 
     private static final String FIELD_TYPE = "operationType";
@@ -58,42 +61,87 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
         this.mongodbConfig = mongodbConfig;
     }
 
+    /**
+     * Extracts records from the provided JsonNode based on the MongoDB 
version strategy.
+     *
+     * @param root The root JsonNode containing the MongoDB record.
+     * @return A list of RichCdcMultiplexRecord extracted from the root node.
+     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     */
     @Override
     public List<RichCdcMultiplexRecord> extractRecords(JsonNode root)
             throws JsonProcessingException {
+        String op = root.get(FIELD_TYPE).asText();
+        JsonNode fullDocument = root.get(FIELD_DATA);
+        return handleOperation(op, fullDocument);
+    }
+
+    /**
+     * Handles the MongoDB operation type and processes the document 
accordingly.
+     *
+     * @param op The operation type (e.g., insert, update, replace).
+     * @param fullDocument The JsonNode representing the full MongoDB document.
+     * @return A list of RichCdcMultiplexRecord based on the operation type.
+     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     */
+    private List<RichCdcMultiplexRecord> handleOperation(String op, JsonNode 
fullDocument)
+            throws JsonProcessingException {
         List<RichCdcMultiplexRecord> records = new ArrayList<>();
         LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
 
-        String op = root.get(FIELD_TYPE).asText();
-        JsonNode fullDocument = root.get(FIELD_DATA);
-        // extract row kind and field values
         switch (op) {
             case OP_INSERT:
-                Map<String, String> insert =
-                        getExtractRow(fullDocument, paimonFieldTypes, 
caseSensitive, mongodbConfig);
-                records.add(
-                        new RichCdcMultiplexRecord(
-                                databaseName,
-                                collection,
-                                paimonFieldTypes,
-                                extractPrimaryKeys(),
-                                new CdcRecord(RowKind.INSERT, insert)));
+                records.add(handleInsert(fullDocument, paimonFieldTypes));
                 break;
             case OP_REPLACE:
             case OP_UPDATE:
-                Map<String, String> after =
-                        getExtractRow(fullDocument, paimonFieldTypes, 
caseSensitive, mongodbConfig);
-                records.add(
-                        new RichCdcMultiplexRecord(
-                                databaseName,
-                                collection,
-                                paimonFieldTypes,
-                                extractPrimaryKeys(),
-                                new CdcRecord(RowKind.UPDATE_AFTER, after)));
+                records.add(handleUpdateOrReplace(fullDocument, 
paimonFieldTypes));
                 break;
             default:
                 throw new UnsupportedOperationException("Unknown record type: 
" + op);
         }
         return records;
     }
+
+    /**
+     * Processes the insert operation and constructs a RichCdcMultiplexRecord.
+     *
+     * @param fullDocument The JsonNode representing the full MongoDB document 
for insertion.
+     * @param paimonFieldTypes A map to store the field types.
+     * @return A RichCdcMultiplexRecord representing the insert operation.
+     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     */
+    private RichCdcMultiplexRecord handleInsert(
+            JsonNode fullDocument, LinkedHashMap<String, DataType> 
paimonFieldTypes)
+            throws JsonProcessingException {
+        Map<String, String> insert =
+                getExtractRow(fullDocument, paimonFieldTypes, caseSensitive, 
mongodbConfig);
+        return new RichCdcMultiplexRecord(
+                databaseName,
+                collection,
+                paimonFieldTypes,
+                extractPrimaryKeys(),
+                new CdcRecord(RowKind.INSERT, insert));
+    }
+
+    /**
+     * Processes the update or replace operation and constructs a 
RichCdcMultiplexRecord.
+     *
+     * @param fullDocument The JsonNode representing the full MongoDB document 
for update/replace.
+     * @param paimonFieldTypes A map to store the field types.
+     * @return A RichCdcMultiplexRecord representing the update or replace 
operation.
+     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     */
+    private RichCdcMultiplexRecord handleUpdateOrReplace(
+            JsonNode fullDocument, LinkedHashMap<String, DataType> 
paimonFieldTypes)
+            throws JsonProcessingException {
+        Map<String, String> after =
+                getExtractRow(fullDocument, paimonFieldTypes, caseSensitive, 
mongodbConfig);
+        return new RichCdcMultiplexRecord(
+                databaseName,
+                collection,
+                paimonFieldTypes,
+                extractPrimaryKeys(),
+                new CdcRecord(RowKind.UPDATE_AFTER, after));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 6d19ed6e4..ffda81304 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -18,46 +18,57 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 
-import org.apache.paimon.flink.action.cdc.mongodb.JsonParserUtils;
-import org.apache.paimon.flink.action.cdc.mongodb.ModeEnum;
+import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import com.jayway.jsonpath.JsonPath;
 import org.apache.flink.configuration.Configuration;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Optional;
 
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.PARSER_PATH;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Processing strategies for different mongodb versions. */
+/** Interface for processing strategies tailored for different MongoDB 
versions. */
 public interface MongoVersionStrategy {
 
     ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+    /**
+     * Extracts records from the provided JsonNode.
+     *
+     * @param root The root JsonNode containing the MongoDB record.
+     * @return A list of RichCdcMultiplexRecord extracted from the root node.
+     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     */
     List<RichCdcMultiplexRecord> extractRecords(JsonNode root) throws 
JsonProcessingException;
 
+    /**
+     * Extracts primary keys from the MongoDB record.
+     *
+     * @return A list of primary keys.
+     */
     default List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        primaryKeys.add("_id");
-        return primaryKeys;
+        return Collections.singletonList("_id");
     }
 
     default Map<String, String> extractRow(String record) {
-        return JsonParserUtils.extractMap(record);
+        return JsonSerdeUtil.parseJsonMap(record, String.class);
     }
 
     default Map<String, String> keyCaseInsensitive(Map<String, String> origin) 
{
@@ -73,19 +84,30 @@ public interface MongoVersionStrategy {
         return keyCaseInsensitive;
     }
 
+    /**
+     * Determines the extraction mode and retrieves the row accordingly.
+     *
+     * @param jsonNode The JsonNode representing the MongoDB document.
+     * @param paimonFieldTypes A map to store the field types.
+     * @param caseSensitive Flag indicating if the extraction should be 
case-sensitive.
+     * @param mongodbConfig Configuration for the MongoDB connection.
+     * @return A map representing the extracted row.
+     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     */
     default Map<String, String> getExtractRow(
             JsonNode jsonNode,
             LinkedHashMap<String, DataType> paimonFieldTypes,
             boolean caseSensitive,
             Configuration mongodbConfig)
             throws JsonProcessingException {
-        ModeEnum mode = 
ModeEnum.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
+        SchemaAcquisitionMode mode =
+                
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
         ObjectNode objectNode = (ObjectNode) 
OBJECT_MAPPER.readTree(jsonNode.asText());
         JsonNode document = objectNode.set("_id", 
objectNode.get("_id").get("$oid"));
         switch (mode) {
             case SPECIFIED:
                 Map<String, String> specifiedRow =
-                        getSpecifiedRow(
+                        parseFieldsFromJsonRecord(
                                 document.toString(),
                                 mongodbConfig.getString(PARSER_PATH),
                                 mongodbConfig.getString(FIELD_NAME),
@@ -93,40 +115,51 @@ public interface MongoVersionStrategy {
                 return caseSensitive ? specifiedRow : 
keyCaseInsensitive(specifiedRow);
             case DYNAMIC:
                 Map<String, String> dynamicRow =
-                        getDynamicRow(document.toString(), paimonFieldTypes);
+                        parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes);
                 return caseSensitive ? dynamicRow : 
keyCaseInsensitive(dynamicRow);
             default:
-                throw new RuntimeException();
+                throw new RuntimeException("Unsupported extraction mode: " + 
mode);
         }
     }
 
-    default Map<String, String> getDynamicRow(
+    /**
+     * Parses a JSON string into a map and updates the data type mapping for 
each key.
+     *
+     * @param evaluate The JSON string to be parsed.
+     * @param paimonFieldTypes A map to store the data types of the keys.
+     * @return A map containing the parsed key-value pairs from the JSON 
string.
+     */
+    default Map<String, String> parseAndTypeJsonRow(
             String evaluate, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
-        Map<String, String> linkedHashMap = 
JsonParserUtils.extractMap(evaluate);
-        Set<String> keySet = linkedHashMap.keySet();
-        String[] columns = keySet.toArray(new String[0]);
-        for (String column : columns) {
+        Map<String, String> parsedMap = JsonSerdeUtil.parseJsonMap(evaluate, 
String.class);
+        for (String column : parsedMap.keySet()) {
             paimonFieldTypes.put(column, DataTypes.STRING());
         }
         return extractRow(evaluate);
     }
 
-    static Map<String, String> getSpecifiedRow(
+    /**
+     * Parses specified fields from a JSON record.
+     *
+     * @param record The JSON record to be parsed.
+     * @param fieldPaths The paths of the fields to be parsed from the JSON 
record.
+     * @param fieldNames The names of the fields to be returned in the result 
map.
+     * @param paimonFieldTypes A map to store the data types of the fields.
+     * @return A map containing the parsed fields and their values.
+     */
+    static Map<String, String> parseFieldsFromJsonRecord(
             String record,
-            String parsePath,
-            String fileName,
+            String fieldPaths,
+            String fieldNames,
             LinkedHashMap<String, DataType> paimonFieldTypes) {
         Map<String, String> resultMap = new HashMap<>();
-        String[] columnNames = fileName.split(",");
-        String[] parseNames = parsePath.split(",");
+        String[] columnNames = fieldNames.split(",");
+        String[] parseNames = fieldPaths.split(",");
+
         for (int i = 0; i < parseNames.length; i++) {
             paimonFieldTypes.put(columnNames[i], DataTypes.STRING());
-            String evaluate = JsonParserUtils.evaluate(record, "$." + 
parseNames[i]);
-            if (evaluate == null) {
-                resultMap.put(columnNames[i], "{}");
-            } else {
-                resultMap.put(columnNames[i], evaluate);
-            }
+            String evaluate = JsonPath.read(record, parseNames[i]);
+            resultMap.put(columnNames[i], 
Optional.ofNullable(evaluate).orElse("{}"));
         }
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
deleted file mode 100644
index d1db139e8..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.mongodb;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-/** UT cases for {@link JsonParserUtils}. */
-public class JsonParserUtilsTest {
-
-    @Test
-    public void testGetJsonObject() {
-        String json =
-                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": { \"totalResults\": 1, \"resultsPerPage\": 1 }, \"items\": [ { 
\"kind\": \"youtube#video\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\": 
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\": 
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\": 
\"29\"  [...]
-        String output = JsonParserUtils.evaluate(json, 
"$.pageInfo.totalResults");
-        assertEquals("1", output);
-    }
-
-    @Test
-    public void testNestedGetJsonObject() {
-        String json =
-                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": { \"pagehit\":{ \"kind\": \"youtube#video\" } ,\"totalResults\": 
1, \"resultsPerPage\": 1 }, \"items\": [ { \"kind\": \"youtube#video\", 
\"etag\": \"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", 
\"id\": \"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", 
\"likeCount\": \"79\", \"dislikeCount\": \"11\", \"favo [...]
-        String output = JsonParserUtils.evaluate(json, "$.pageInfo.pagehit");
-        assertEquals("{\"kind\":\"youtube#video\"}", output);
-    }
-
-    @Test
-    public void testStringWhenNotJson() {
-        String json =
-                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": \"page\", \"items\": [ { \"kind\": \"youtube#video\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\": 
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\": 
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\": 
\"29\" }, \"topicDetails\": { \"topicIds\": [ [...]
-        String output = JsonParserUtils.evaluate(json, 
"$.pageinfo.test_field");
-        assertNull(output);
-    }
-
-    @Test
-    public void testStringWhenJson() {
-        String json =
-                "{ \"kind\": \"youtube#videoListResponse\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\", 
\"pageInfo\": \"page\", \"items\": [ { \"kind\": \"youtube#video\", \"etag\": 
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\": 
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\": 
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\": 
\"29\" }, \"topicDetails\": { \"topicIds\": [ [...]
-        String output = JsonParserUtils.evaluate(json, "$.pageInfo");
-        assertEquals("page", output);
-    }
-
-    @Test
-    public void testJsonArray() {
-        String json =
-                "{\"country\":\"Switzerland\",\"languages\": [ \"Italian\" 
],\"religions\": \"christian\"}";
-        String output = JsonParserUtils.evaluate(json, "$.languages[0]");
-        assertEquals("Italian", output);
-    }
-
-    @Test
-    public void testJsonMap() {
-        String json =
-                
"{\"country\":\"Switzerland\",\"languages\":\"Italian\",\"religions\": { \"f\": 
\"v\", \"n\":null} }";
-        String output = JsonParserUtils.evaluate(json, "$.religions.f");
-        assertEquals("v", output);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
index 10eaf9d3f..969a5fc25 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
@@ -48,10 +48,6 @@ public class MongoDBContainer extends 
org.testcontainers.containers.MongoDBConta
 
     public static final int MONGODB_PORT = 27017;
 
-    public static final String FLINK_USER = "flinkuser";
-
-    public static final String FLINK_USER_PASSWORD = 
"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
-
     public MongoDBContainer(String imageName) {
         super(imageName);
     }
@@ -142,12 +138,6 @@ public class MongoDBContainer extends 
org.testcontainers.containers.MongoDBConta
         }
     }
 
-    /** Executes a mongo command in separate database. */
-    public String executeCommandInSeparateDatabase(String command, String 
baseName) {
-        return executeCommandInDatabase(
-                command, baseName + "_" + Integer.toUnsignedString(new 
Random().nextInt(), 36));
-    }
-
     /** Executes a mongo command file in separate database. */
     public String executeCommandFileInSeparateDatabase(
             String fileNameIgnoreSuffix, String content) {
@@ -171,7 +161,6 @@ public class MongoDBContainer extends 
org.testcontainers.containers.MongoDBConta
         assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
 
         try {
-            // use database;
             String command0 = String.format("db = db.getSiblingDB('%s');\n", 
dbName);
             String command1 =
                     Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 4c5b822cf..ad213b5b2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -36,7 +36,7 @@ import java.util.Map;
 public class MongoDBSyncDatabaseActionITCase extends MongoDBActionITCaseBase {
 
     @Test
-    @Timeout(120)
+    @Timeout(60)
     public void testSchemaEvolution() throws Exception {
         writeRecordsToMongoDB("test-data-1", database, "database");
         writeRecordsToMongoDB("test-data-2", database, "database");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 87e5906e7..e62f1c815 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -135,7 +135,7 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.put("database", inventory);
         mongodbConfig.put("collection", "products");
         mongodbConfig.put("field.name", "_id,name,description");
-        mongodbConfig.put("parser.path", "_id,name,description");
+        mongodbConfig.put("parser.path", "$._id,$.name,$.description");
         mongodbConfig.put("schema.start.mode", "specified");
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);


Reply via email to