This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7eb545eb1 [flink] mongodb into the lake related code optimized (#1861)
7eb545eb1 is described below
commit 7eb545eb18185502050ab5ff0f8e6c1db9bfde73
Author: monster <[email protected]>
AuthorDate: Mon Aug 28 22:30:20 2023 +0800
[flink] mongodb into the lake related code optimized (#1861)
---
LICENSE | 3 +
docs/content/how-to/cdc-ingestion.md | 57 ++++-
.../shortcodes/generated/mongodb_functions.html | 95 ++++++++
.../shortcodes/generated/mongodb_operator.html | 65 ++++++
.../shortcodes/generated/mongodb_path_example.html | 97 ++++++++
.../generated/mongodb_sync_database.html | 65 ++++++
.../shortcodes/generated/mongodb_sync_table.html | 57 +++++
.../org/apache/paimon/utils/JsonSerdeUtil.java | 25 ++-
paimon-flink/paimon-flink-common/pom.xml | 7 +
.../flink/action/cdc/mongodb/JsonParserUtils.java | 247 ---------------------
.../action/cdc/mongodb/MongoDBActionUtils.java | 120 +++++-----
.../action/cdc/mongodb/MongoDBRecordParser.java | 58 +++--
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 27 +--
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 43 ++--
.../flink/action/cdc/mongodb/MongodbSchema.java | 83 ++++---
.../{ModeEnum.java => SchemaAcquisitionMode.java} | 8 +-
.../mongodb/strategy/Mongo4VersionStrategy.java | 92 ++++++--
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 91 +++++---
.../action/cdc/mongodb/JsonParserUtilsTest.java | 76 -------
.../flink/action/cdc/mongodb/MongoDBContainer.java | 11 -
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 2 +-
.../cdc/mongodb/MongoDBSyncTableActionITCase.java | 2 +-
22 files changed, 799 insertions(+), 532 deletions(-)
diff --git a/LICENSE b/LICENSE
index 74613fb9d..75d02b53a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -222,6 +222,9 @@ from https://github.com/aws/aws-sdk-java version 1.12.319
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
from https://ververica.github.io/flink-cdc-connectors/ version 2.3.0
+paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
+from https://ververica.github.io/flink-cdc-connectors/ version 2.4.0
+
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
from https://github.com/apache/doris-flink-connector/ version 1.4.0
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index b7158e335..670827d3b 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -449,6 +449,9 @@ To use this feature through `flink run`, run the following
shell command.
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf
<paimon-table-sink-conf> ...]]
```
+
+{{< generated/mongodb_sync_table >}}
+
Here are a few points to take note of:
1. The "mongodb-conf" introduces the "schema.start.mode" parameter on top of
the MongoDB CDC source configuration."schema.start.mode" provides two modes:
"dynamic" (default) and "specified".
@@ -459,6 +462,56 @@ The difference between the two is that the "specify" mode
requires the user to e
Dynamic mode, on the other hand, ensures that Paimon and MongoDB always keep
the top-level fields consistent, eliminating the need to focus on specific
fields.
Further processing of the data table is required when using values from nested
fields.
+{{< generated/mongodb_operator >}}
+
+
+Functions can be invoked at the tail end of a path - the input to a function
is the output of the path expression. The function output is dictated by the
function itself.
+
+{{< generated/mongodb_functions) >}}
+
+Path Examples
+```json
+{
+ "store": {
+ "book": [
+ {
+ "category": "reference",
+ "author": "Nigel Rees",
+ "title": "Sayings of the Century",
+ "price": 8.95
+ },
+ {
+ "category": "fiction",
+ "author": "Evelyn Waugh",
+ "title": "Sword of Honour",
+ "price": 12.99
+ },
+ {
+ "category": "fiction",
+ "author": "Herman Melville",
+ "title": "Moby Dick",
+ "isbn": "0-553-21311-3",
+ "price": 8.99
+ },
+ {
+ "category": "fiction",
+ "author": "J. R. R. Tolkien",
+ "title": "The Lord of the Rings",
+ "isbn": "0-395-19395-8",
+ "price": 22.99
+ }
+ ],
+ "bicycle": {
+ "color": "red",
+ "price": 19.95
+ }
+ },
+ "expensive": 10
+}
+```
+
+{{< generated/mongodb_path_example) >}}
+
2. The synchronized table is required to have its primary key set as `_id`.
This is because MongoDB's change events are recorded before updates in
messages.
Consequently, we can only convert them into Flink's UPSERT change log stream.
@@ -509,7 +562,7 @@ Example 2: Synchronize collection into a Paimon table
according to the specified
--mongodb-conf collection=source_table1 \
--mongodb-conf schema.start.mode=specified \
--mongodb-conf field.name=_id,name,description \
- --mongodb-conf parser.path=_id,name,description \
+ --mongodb-conf parser.path=$._id,$.name,$.description \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
@@ -538,6 +591,8 @@ To use this feature through `flink run`, run the following
shell command.
[--table-conf <paimon-table-sink-conf> [--table-conf
<paimon-table-sink-conf> ...]]
```
+{{< generated/mongodb_sync_database >}}
+
All collections to be synchronized need to set _id as the primary key.
For each MongoDB collection to be synchronized, if the corresponding Paimon
table does not exist, this action will automatically create the table.
Its schema will be derived from all specified MongoDB collection. If the
Paimon table already exists, its schema will be compared against the schema of
all specified MongoDB collection.
diff --git a/docs/layouts/shortcodes/generated/mongodb_functions.html
b/docs/layouts/shortcodes/generated/mongodb_functions.html
new file mode 100644
index 000000000..a10f41d16
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_functions.html
@@ -0,0 +1,95 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 15%">Function</th>
+ <th class="text-left" style="width: 70%">Description</th>
+ <th class="text-left" style="width: 15%">Output type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>min()</h5></td>
+ <td>Provides the min value of an array of numbers.</td>
+ <td>Double</td>
+ </tr>
+ <tr>
+ <td><h5>max()</h5></td>
+ <td>Provides the max value of an array of numbers.</td>
+ <td>Double</td>
+ </tr>
+ <tr>
+ <td><h5>avg()</h5></td>
+ <td>Provides the average value of an array of numbers.</td>
+ <td>Double</td>
+ </tr>
+ <tr>
+ <td><h5>stddev()</h5></td>
+ <td>Provides the standard deviation value of an array of numbers</td>
+ <td>Double</td>
+ </tr>
+ <tr>
+ <td><h5>length()</h5></td>
+ <td>Provides the length of an array</td>
+ <td>Integer</td>
+ </tr>
+ <tr>
+ <td><h5>sum()</h5></td>
+ <td>Provides the sum value of an array of numbers.</td>
+ <td>Double</td>
+ </tr>
+ <tr>
+ <td><h5>keys()</h5></td>
+ <td>Provides the property keys (An alternative for terminal tilde
~)</td>
+ <td>Set</td>
+ </tr>
+ <tr>
+ <td><h5>concat(X)</h5></td>
+ <td>Provides a concatinated version of the path output with a new
item.</td>
+ <td>like input</td>
+ </tr>
+ <tr>
+ <td><h5>append(X)</h5></td>
+ <td>add an item to the json path output array</td>
+ <td>like input</td>
+ </tr>
+ <tr>
+ <td><h5>append(X)</h5></td>
+ <td>add an item to the json path output array</td>
+ <td>like input</td>
+ </tr>
+ <tr>
+ <td><h5>first()</h5></td>
+ <td>Provides the first item of an array</td>
+ <td>Depends on the array</td>
+ </tr>
+ <tr>
+ <td><h5>last()</h5></td>
+ <td>Provides the last item of an array</td>
+ <td>Depends on the array</td>
+ </tr>
+ <tr>
+ <td><h5>index(X)</h5></td>
+ <td>Provides the item of an array of index: X, if the X is negative,
take from backwards</td>
+ <td>Depends on the array</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_operator.html
b/docs/layouts/shortcodes/generated/mongodb_operator.html
new file mode 100644
index 000000000..2f4f99197
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_operator.html
@@ -0,0 +1,65 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 15%">Operator</th>
+ <th class="text-left" style="width: 85%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>$</h5></td>
+ <td>The root element to query. This starts all path expressions.</td>
+ </tr>
+ <tr>
+ <td><h5>@</h5></td>
+ <td>The current node being processed by a filter predicate.</td>
+ </tr>
+ <tr>
+ <td><h5>*</h5></td>
+ <td>Wildcard. Available anywhere a name or numeric are required.</td>
+ </tr>
+ <tr>
+ <td><h5>..</h5></td>
+ <td>Deep scan. Available anywhere a name is required.</td>
+ </tr>
+ <tr>
+ <td><h5>.</h5></td>
+ <td>Dot-notated child.</td>
+ </tr>
+ <tr>
+ <td><h5>['{name}' (, '{name}')]</h5></td>
+ <td>Bracket-notated child or children.</td>
+ </tr>
+ <tr>
+ <td><h5>[{number} (, {number})]</h5></td>
+ <td>Bracket-notated child or children.</td>
+ </tr>
+ <tr>
+ <td><h5>[start:end]</h5></td>
+ <td>Array index or indexes.</td>
+ </tr>
+ <tr>
+ <td><h5>[?({expression})]</h5></td>
+ <td>Filter expression. Expression must evaluate to a boolean
value.</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_path_example.html
b/docs/layouts/shortcodes/generated/mongodb_path_example.html
new file mode 100644
index 000000000..fb1c6a874
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_path_example.html
@@ -0,0 +1,97 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 15%">JsonPath</th>
+ <th class="text-left" style="width: 85%">Result</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>$.store.book[*].author</h5></td>
+ <td>Provides the min value of an array of numbers.</td>
+ </tr>
+ <tr>
+ <td><h5>$..author</h5></td>
+ <td>All authors.</td>
+ </tr>
+ <tr>
+ <td><h5>$.store.*</h5></td>
+ <td>All things, both books and bicycles.</td>
+ </tr>
+ <tr>
+ <td><h5>$.store..price</h5></td>
+ <td>Provides the standard deviation value of an array of numbers.</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[2]</h5></td>
+ <td>The third book.</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[-2]</h5></td>
+ <td>The second to last book.</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[0,1]</h5></td>
+ <td>The first two books.</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[:2]</h5></td>
+ <td>All books from index 0 (inclusive) until index 2 (exclusive).</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[1:2]</h5></td>
+ <td>All books from index 1 (inclusive) until index 2 (exclusive)</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[-2:]</h5></td>
+ <td>Last two books</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[2:]</h5></td>
+ <td>All books from index 2 (inclusive) to last</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[?(@.isbn)]</h5></td>
+ <td>All books with an ISBN number</td>
+ </tr>
+ <tr>
+ <td><h5>$.store.book[?(@.price < 10)]</h5></td>
+ <td>All books in store cheaper than 10</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[?(@.price <= $['expensive'])]</h5></td>
+ <td>All books in store that are not "expensive"</td>
+ </tr>
+ <tr>
+ <td><h5>$..book[?(@.author =~ /.*REES/i)]</h5></td>
+ <td>All books matching regex (ignore case)</td>
+ </tr>
+ <tr>
+ <td><h5>$..*</h5></td>
+ <td>Give me every thing</td>
+ </tr>
+ <tr>
+ <td><h5>$..book.length()</h5></td>
+ <td>The number of books</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
new file mode 100644
index 000000000..a7c9ae94f
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -0,0 +1,65 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 15%">Configuration</th>
+ <th class="text-left" style="width: 85%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>--warehouse</h5></td>
+ <td>The path to Paimon warehouse.</td>
+ </tr>
+ <tr>
+ <td><h5>--database</h5></td>
+ <td>The database name in Paimon catalog.</td>
+ </tr>
+ <tr>
+ <td><h5>--table-prefix</h5></td>
+ <td>The prefix of all Paimon tables to be synchronized. For example,
if you want all synchronized tables to have "ods_" as prefix, you can specify
"--table-prefix ods_".</td>
+ </tr>
+ <tr>
+ <td><h5>--table-suffix</h5></td>
+ <td>The suffix of all Paimon tables to be synchronized. The usage is
same as "--table-prefix".</td>
+ </tr>
+ <tr>
+ <td><h5>--including-tables</h5></td>
+ <td>It is used to specify which source tables are to be synchronized.
You must use '|' to separate multiple tables.Because '|' is a special
character, a comma is required, for example: 'a|b|c'.Regular expression is
supported, for example, specifying "--including-tables test|paimon.*" means to
synchronize table 'test' and all tables start with 'paimon'.</td>
+ </tr>
+ <tr>
+ <td><h5>--excluding-tables</h5></td>
+ <td>It is used to specify which source tables are not to be
synchronized. The usage is same as "--including-tables". "--excluding-tables"
has higher priority than "--including-tables" if you specified both.</td>
+ </tr>
+ <tr>
+ <td><h5>--mongodb-conf</h5></td>
+ <td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
+ </tr>
+ <tr>
+ <td><h5>--catalog-conf</h5></td>
+ <td>The configuration for Paimon catalog. Each configuration should be
specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a
complete list of catalog configurations.</td>
+ </tr>
+ <tr>
+ <td><h5>--table-conf</h5></td>
+ <td>The configuration for Paimon table sink. Each configuration should
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a
complete list of table configurations.</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_table.html
b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
new file mode 100644
index 000000000..440580eb7
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
@@ -0,0 +1,57 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 15%">Configuration</th>
+ <th class="text-left" style="width: 85%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>--warehouse</h5></td>
+ <td>The path to Paimon warehouse.</td>
+ </tr>
+ <tr>
+ <td><h5>--database</h5></td>
+ <td>The database name in Paimon catalog.</td>
+ </tr>
+ <tr>
+ <td><h5>--table</h5></td>
+ <td>The Paimon table name.</td>
+ </tr>
+ <tr>
+ <td><h5>--partition-keys</h5></td>
+ <td>The partition keys for Paimon table. If there are multiple
partition keys, connect them with comma, for example "dt,hh,mm".</td>
+ </tr>
+ <tr>
+ <td><h5>--mongodb-conf</h5></td>
+ <td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
+ </tr>
+ <tr>
+ <td><h5>--catalog-conf</h5></td>
+ <td>The configuration for Paimon catalog. Each configuration should be
specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a
complete list of catalog configurations.</td>
+ </tr>
+ <tr>
+ <td><h5>--table-conf</h5></td>
+ <td>The configuration for Paimon table sink. Each configuration should
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a
complete list of table configurations.</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 0ffd1fb32..4ea553249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -27,6 +27,7 @@ import org.apache.paimon.types.DataTypeJsonParser;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.Module;
@@ -38,7 +39,10 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/** A utility class that provide abilities for JSON serialization and
deserialization. */
public class JsonSerdeUtil {
@@ -54,6 +58,25 @@ public class JsonSerdeUtil {
OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule());
}
+ public static <V> LinkedHashMap<String, V> parseJsonMap(String jsonString,
Class<V> valueType) {
+ try {
+ LinkedHashMap<String, Object> originalMap =
+ OBJECT_MAPPER_INSTANCE.readValue(
+ jsonString, new
TypeReference<LinkedHashMap<String, Object>>() {});
+ return originalMap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
+
OBJECT_MAPPER_INSTANCE.convertValue(
+ entry.getValue(),
valueType),
+ (oldValue, newValue) -> oldValue,
+ LinkedHashMap::new));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Error parsing JSON string", e);
+ }
+ }
+
/**
* Retrieves a specific node from the given root node and casts it to the
specified type.
*
@@ -61,7 +84,7 @@ public class JsonSerdeUtil {
* @param root The root node from which the specific node is to be
retrieved.
* @param fieldName The name of the field to retrieve.
* @param clazz The class of the node to be returned.
- * @return The node casted to the specified type.
+ * @return The node cast to the specified type.
* @throws IllegalArgumentException if the node is not present or if it's
not of the expected
* type.
*/
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 53ff1344d..200e3026d 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -40,6 +40,7 @@ under the License.
<geometry.version>2.2.0</geometry.version>
<avro.version>1.11.1</avro.version>
<mongodb.testcontainers.version>1.18.3</mongodb.testcontainers.version>
+ <json-path.version>2.8.0</json-path.version>
</properties>
<dependencies>
@@ -174,6 +175,12 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>${json-path.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
deleted file mode 100644
index 61340542d..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtils.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.mongodb;
-
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.type.MapType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * specified, and return json string of the extracted json object. It will
return null if the input
- * json string is invalid. A limited version of JSONPath supported: $ : Root
object . : Child
- * operator [] : Subscript operator for array * : Wildcard for [] Syntax not
supported that's worth
- * noticing: '' : Zero length string as key .. : Recursive descent &#064;
: Current
- * object/element () : Script expression ?() : Filter (script) expression. [,]
: Union operator
- * [start:end:step] : array slice operator.
- */
-@SuppressWarnings("unchecked")
-public class JsonParserUtils implements Serializable {
-
- private static final Pattern patternKey =
Pattern.compile("^([a-zA-Z0-9_\\-:\\s]+).*");
- private static final Pattern patternIndex =
Pattern.compile("\\[([0-9]+|\\*)]");
- private static final JsonFactory JSON_FACTORY = new JsonFactory();
-
- static {
- // Allows for unescaped ASCII control characters in JSON values
- JSON_FACTORY.enable(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
- }
-
- private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
- private static final MapType MAP_TYPE =
- MAPPER.getTypeFactory().constructMapType(Map.class, String.class,
Object.class);
-
- // An LRU cache using a linked hash map
- static class HashCache<K, V> extends LinkedHashMap<K, V> {
-
- private static final int CACHE_SIZE = 16;
- private static final int INIT_SIZE = 32;
- private static final float LOAD_FACTOR = 0.6f;
-
- HashCache() {
- super(INIT_SIZE, LOAD_FACTOR);
- }
-
- private static final long serialVersionUID = 1;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- return size() > CACHE_SIZE;
- }
- }
-
- static Map<String, Object> extractObjectCache = new HashCache<>();
- static Map<String, String[]> pathExprCache = new HashCache<>();
- static Map<String, ArrayList<String>> indexListCache = new HashCache<>();
- static Map<String, String> mKeyGroupCache = new HashCache<>();
- static Map<String, Boolean> mKeyMatchesCache = new HashCache<>();
-
- public static String evaluate(String jsonString, String pathString) {
-
- if (jsonString == null
- || jsonString.equals("")
- || pathString == null
- || pathString.equals("")) {
- return null;
- }
-
- // Cache pathExpr
- String[] pathExpr = pathExprCache.computeIfAbsent(pathString, s ->
s.split("\\.", -1));
-
- if (!pathExpr[0].equalsIgnoreCase("$")) {
- return null;
- }
- // Cache extractObject
- Object extractObject = extractObjectCache.get(jsonString);
- if (extractObject == null) {
- try {
- extractObject = MAPPER.readValue(jsonString, MAP_TYPE);
- } catch (Exception e) {
- return null;
- }
- extractObjectCache.put(jsonString, extractObject);
- }
- for (int i = 1; i < pathExpr.length; i++) {
- if (extractObject == null) {
- return null;
- }
- extractObject = extract(extractObject, pathExpr[i]);
- }
- if (extractObject instanceof Map || extractObject instanceof List) {
- try {
- return MAPPER.writeValueAsString(extractObject);
- } catch (Exception e) {
- return null;
- }
- } else if (extractObject != null) {
- return extractObject.toString();
- } else {
- return null;
- }
- }
-
- private static Object extract(Object json, String path) {
-
- // Cache patternkey.matcher(path).matches()
- Matcher mKey = null;
- Boolean mKeyMatches = mKeyMatchesCache.get(path);
- if (mKeyMatches == null) {
- mKey = patternKey.matcher(path);
- mKeyMatches = mKey.matches() ? Boolean.TRUE : Boolean.FALSE;
- mKeyMatchesCache.put(path, mKeyMatches);
- }
- if (!mKeyMatches) {
- return null;
- }
-
- // Cache mkey.group(1)
- String mKeyGroup1 = mKeyGroupCache.get(path);
- if (mKeyGroup1 == null) {
- if (mKey == null) {
- mKey = patternKey.matcher(path);
- }
- mKeyGroup1 = mKey.group(1);
- mKeyGroupCache.put(path, mKeyGroup1);
- }
- json = extract_json_key(json, mKeyGroup1);
-
- // Cache indexList
- ArrayList<String> indexList = indexListCache.get(path);
- if (indexList == null) {
- Matcher mIndex = patternIndex.matcher(path);
- indexList = new ArrayList<>();
- while (mIndex.find()) {
- indexList.add(mIndex.group(1));
- }
- indexListCache.put(path, indexList);
- }
-
- if (indexList.size() > 0) {
- json = extract_json_withIndex(json, indexList);
- }
-
- return json;
- }
-
- private static Object extract_json_withIndex(Object json,
ArrayList<String> indexList) {
- List<Object> jsonList = new ArrayList<>();
- jsonList.add(json);
- for (String index : indexList) {
- List<Object> tmpJsonList = new ArrayList<>();
- if (index.equalsIgnoreCase("*")) {
- for (Object array : jsonList) {
- if (array instanceof List) {
- tmpJsonList.addAll((List<Object>) array);
- }
- }
- jsonList = tmpJsonList;
- } else {
- for (int i = 0; i < (jsonList).size(); i++) {
- Object array = jsonList.get(i);
- int indexValue = Integer.parseInt(index);
- if (!(array instanceof List)) {
- continue;
- }
- if (indexValue >= ((List<Object>) array).size()) {
- return null;
- }
- tmpJsonList.add(((List<Object>) array).get(indexValue));
- jsonList = tmpJsonList;
- }
- }
- }
- if (jsonList.isEmpty()) {
- return null;
- }
- return (jsonList.size() > 1) ? new ArrayList<>(jsonList) :
jsonList.get(0);
- }
-
- private static Object extract_json_key(Object json, String path) {
- if (json instanceof List) {
- List<Object> jsonArray = new ArrayList<>();
- for (int i = 0; i < ((List<Object>) json).size(); i++) {
- Object jsonElem = ((List<Object>) json).get(i);
- Object jsonObj;
- if (jsonElem instanceof Map) {
- jsonObj = ((Map<String, Object>) jsonElem).get(path);
- } else {
- continue;
- }
- if (jsonObj instanceof List) {
- jsonArray.addAll((List<Object>) jsonObj);
- } else if (jsonObj != null) {
- jsonArray.add(jsonObj);
- }
- }
- return (jsonArray.size() == 0) ? null : jsonArray;
- } else if (json instanceof Map) {
- return ((Map<String, Object>) json).get(path);
- } else {
- return null;
- }
- }
-
- public static LinkedHashMap<String, String> extractMap(String jsonString) {
- try {
- LinkedHashMap<String, ?> originalMap =
MAPPER.readValue(jsonString, MAP_TYPE);
- LinkedHashMap<String, String> stringMap = new LinkedHashMap<>();
-
- originalMap.forEach(
- (key, value) -> {
- String stringValue = Objects.toString(value, null);
- stringMap.put(key.toLowerCase(), stringValue);
- });
-
- return stringMap;
-
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index 1b062a8a1..5870db179 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -23,9 +23,6 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
@@ -36,38 +33,59 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.bson.Document;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Utils for MongoDB Action. */
+/**
+ * Utility class for MongoDB-related actions.
+ *
+ * <p>This class provides a set of utility methods to facilitate the creation
and configuration of
+ * MongoDB sources, as well as the construction of Paimon schemas based on
MongoDB schemas. It also
+ * includes methods for validating MongoDB configurations and fetching MongoDB
version information.
+ *
+ * <p>Key functionalities include:
+ *
+ * <ul>
+ * <li>Building MongoDB sources with various configurations.
+ * <li>Constructing Paimon schemas based on MongoDB schemas.
+ * <li>Validating essential MongoDB configurations.
+ * </ul>
+ *
+ * <p>Note: This utility class is designed to be used in conjunction with
Flink and Paimon
+ * integrations.
+ */
public class MongoDBActionUtils {
+ private static final String INITIAL_MODE = "initial";
+ private static final String LATEST_OFFSET_MODE = "latest-offset";
+ private static final String TIMESTAMP_MODE = "timestamp";
+ private static final String PRIMARY_KEY = "_id";
+
public static final ConfigOption<String> FIELD_NAME =
ConfigOptions.key("field.name")
.stringType()
.noDefaultValue()
- .withDescription(
- "Set the field names to be synchronized in the
`specified` mode.");
+ .withDescription("Field names to synchronize when in
`specified` mode.");
public static final ConfigOption<String> PARSER_PATH =
ConfigOptions.key("parser.path")
.stringType()
.noDefaultValue()
.withDescription(
- "Configure the JSON parsing path for synchronizing
field values in the `specified` mode.");
+ "JSON parsing path for field synchronization in
`specified` mode.");
public static final ConfigOption<String> START_MODE =
ConfigOptions.key("schema.start.mode")
.stringType()
.defaultValue("dynamic")
- .withDescription("Can choose between the `dynamic` and
`specified` modes.");
+ .withDescription("Mode selection: `dynamic` or
`specified`.");
static MongoDBSource<String> buildMongodbSource(Configuration
mongodbConfig, String tableList) {
validateMongodbConfig(mongodbConfig);
@@ -100,14 +118,20 @@ public class MongoDBActionUtils {
.collectionList(tableList);
String startupMode =
mongodbConfig.get(SourceOptions.SCAN_STARTUP_MODE);
- if ("initial".equalsIgnoreCase(startupMode)) {
- sourceBuilder.startupOptions(StartupOptions.initial());
- } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
- sourceBuilder.startupOptions(StartupOptions.latest());
- } else if ("timestamp".equalsIgnoreCase(startupMode)) {
- sourceBuilder.startupOptions(
- StartupOptions.timestamp(
-
mongodbConfig.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+ switch (startupMode.toLowerCase()) {
+ case INITIAL_MODE:
+ sourceBuilder.startupOptions(StartupOptions.initial());
+ break;
+ case LATEST_OFFSET_MODE:
+ sourceBuilder.startupOptions(StartupOptions.latest());
+ break;
+ case TIMESTAMP_MODE:
+ sourceBuilder.startupOptions(
+ StartupOptions.timestamp(
+
mongodbConfig.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported startup mode:
" + startupMode);
}
Map<String, Object> customConverterConfigs = new HashMap<>();
@@ -136,48 +160,34 @@ public class MongoDBActionUtils {
List<String> specifiedPartitionKeys,
Map<String, String> paimonConfig,
boolean caseSensitive) {
- Schema.Builder builder = Schema.newBuilder();
- builder.options(paimonConfig);
-
- Map<String, DataType> mongodbFields;
- if (caseSensitive) {
- mongodbFields = mongodbSchema.fields();
- } else {
- mongodbFields = new LinkedHashMap<>();
- for (Map.Entry<String, DataType> entry :
mongodbSchema.fields().entrySet()) {
- String fieldName = entry.getKey();
- checkArgument(
- !mongodbFields.containsKey(fieldName.toLowerCase()),
- String.format(
- "Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
- fieldName, mongodbSchema.tableName()));
- mongodbFields.put(fieldName.toLowerCase(), entry.getValue());
- }
- }
- for (Map.Entry<String, DataType> entry : mongodbFields.entrySet()) {
- builder.column(entry.getKey(), entry.getValue());
- }
-
- builder.primaryKey(Lists.newArrayList("_id"));
-
- if (specifiedPartitionKeys.size() > 0) {
+ Schema.Builder builder = Schema.newBuilder().options(paimonConfig);
+
+ Map<String, DataType> mongodbFields =
+ caseSensitive
+ ? mongodbSchema.fields()
+ : mongodbSchema.fields().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry ->
entry.getKey().toLowerCase(),
+ Map.Entry::getValue,
+ (existing, replacement) -> {
+ throw new
IllegalArgumentException(
+ String.format(
+ "Duplicate
key '%s' in table '%s' appears when converting fields map keys to
case-insensitive form.",
+ existing,
+
mongodbSchema.tableName()));
+ },
+ LinkedHashMap::new));
+
+ mongodbFields.forEach(builder::column);
+
+ builder.primaryKey(Lists.newArrayList(PRIMARY_KEY));
+
+ if (!specifiedPartitionKeys.isEmpty()) {
builder.partitionKeys(specifiedPartitionKeys);
}
return builder.build();
}
-
- public static int getMongoDBVersion(Configuration mongodbConfig) {
- String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
- String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
-
- String url = String.format("mongodb://%s/%s", hosts, databaseName);
- try (MongoClient mongoClient = MongoClients.create(url)) {
- MongoDatabase database = mongoClient.getDatabase(databaseName);
- Document buildInfo = database.runCommand(new Document("buildInfo",
1));
- String[] split = ((String) buildInfo.get("version")).split("\\.");
- return Integer.parseInt(split[0]);
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index b0c06a627..a8aad81af 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -30,19 +30,38 @@ import
org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
-/** Convert MongoDB Debezium JSON string to list of {@link
RichCdcMultiplexRecord}s. */
+/**
+ * A parser for MongoDB Debezium JSON strings, converting them into a list of
{@link
+ * RichCdcMultiplexRecord}s.
+ *
+ * <p>This parser is designed to process and transform incoming MongoDB
Debezium JSON records into a
+ * more structured format suitable for further processing. It takes into
account the version of
+ * MongoDB and applies the appropriate strategy for extracting records.
+ *
+ * <p>Key features include:
+ *
+ * <ul>
+ * <li>Support for case-sensitive and case-insensitive field names.
+ * <li>Integration with a configurable table name converter.
+ * <li>Ability to work with different MongoDB version strategies (e.g.,
Mongo4, Mongo6).
+ * </ul>
+ *
+ * <p>Note: This parser is primarily intended for use in Flink streaming
applications that process
+ * MongoDB CDC data.
+ */
public class MongoDBRecordParser implements FlatMapFunction<String,
RichCdcMultiplexRecord> {
private static final String FIELD_DATABASE = "db";
private static final String FIELD_TABLE = "coll";
- private final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String FIELD_NAMESPACE = "ns";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final Configuration mongodbConfig;
private JsonNode root;
public MongoDBRecordParser(boolean caseSensitive, Configuration
mongodbConfig) {
-
this(caseSensitive, new TableNameConverter(caseSensitive),
mongodbConfig);
}
@@ -57,26 +76,31 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
@Override
public void flatMap(String value, Collector<RichCdcMultiplexRecord> out)
throws Exception {
- root = objectMapper.readValue(value, JsonNode.class);
+ root = OBJECT_MAPPER.readValue(value, JsonNode.class);
String databaseName = extractString(FIELD_DATABASE);
String collection =
tableNameConverter.convert(extractString(FIELD_TABLE));
MongoVersionStrategy versionStrategy =
- new Mongo4VersionStrategy(databaseName, collection,
caseSensitive, mongodbConfig);
-
- // TODO:Upgrade mongodb cdc to 2.5 and enable scanFullChangelog
capability
- // [Full Changelog]
- //
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html
- // if (mongodbVersion >= 6) {
- // versionStrategy = new
Mongo6VersionStrategy(databaseName, collection,
- // caseSensitive);
- // } else {
- // versionStrategy = new
Mongo4VersionStrategy(databaseName, collection,
- // caseSensitive);
- // }
+ VersionStrategyFactory.create(
+ databaseName, collection, caseSensitive,
mongodbConfig);
versionStrategy.extractRecords(root).forEach(out::collect);
}
private String extractString(String key) {
- return root.get("ns").get(key).asText();
+ return root.get(FIELD_NAMESPACE).get(key).asText();
+ }
+
+ private static class VersionStrategyFactory {
+ static MongoVersionStrategy create(
+ String databaseName,
+ String collection,
+ boolean caseSensitive,
+ Configuration mongodbConfig) {
+ // TODO: When MongoDB CDC is upgraded to 2.5, uncomment the
version check logic
+ // if (mongodbVersion >= 6) {
+ // return new Mongo6VersionStrategy(databaseName, collection,
caseSensitive);
+ // }
+ return new Mongo4VersionStrategy(
+ databaseName, collection, caseSensitive, mongodbConfig);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 82c879d89..e7d613512 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
@@ -47,27 +46,23 @@ import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
- * An {@link Action} which synchronize the whole MongoDB database into one
Paimon database.
+ * An action class responsible for synchronizing MongoDB databases with a
target system.
*
- * <p>You should specify MongoDB source database in {@code mongodbConfig}. See
<a
- *
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document
- * of flink-cdc-connectors</a> for detailed keys and values.
+ * <p>This class provides functionality to read data from a MongoDB source,
process it, and then
+ * synchronize it with a target system. It supports various configurations,
including table
+ * prefixes, suffixes, and inclusion/exclusion patterns.
*
- * <p>For each MongoDB collections to be synchronized, if the corresponding
Paimon table does not
- * exist, this action will automatically create the table. Its schema will be
derived from all
- * specified MongoDB collections. If the Paimon table already exists, its
schema will be compared
- * against the schema of all specified MongoDB collections.
- *
- * <p>This action supports a limited number of schema changes. Currently, the
framework can not drop
- * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a
new column. Currently
- * supported schema changes includes:
+ * <p>Key features include:
*
* <ul>
- * <li>Adding columns.
+ * <li>Support for case-sensitive and case-insensitive database and table
names.
+ * <li>Configurable table name conversion with prefixes and suffixes.
+ * <li>Ability to include or exclude specific tables using regular
expressions.
+ * <li>Integration with Flink's streaming environment for data processing.
* </ul>
*
- * <p>To automatically synchronize new table, This action creates a single
sink for all Paimon
- * tables to be written. See {@link DatabaseSyncMode#COMBINED}.
+ * <p>Note: This action is primarily intended for use in Flink streaming
applications that
+ * synchronize MongoDB data with other systems.
*/
public class MongoDBSyncDatabaseAction extends ActionBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 6283f521d..3aaad32b8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -41,22 +40,23 @@ import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
- * An {@link Action} which synchronize one MongoDB collection into one Paimon
table.
+ * Represents an action to synchronize a specific MongoDB table with a target
system.
*
- * <p>You should specify MongodbDB source topic in {@code mongodbConfig}. See
<a
- *
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document
- * of flink-connectors</a> for detailed keys and values.
- *
- * <p>If the specified Paimon table does not exist, this action will
automatically create the table.
- * Its schema will be derived from all specified MonodbDB collection. If the
Paimon table already
- * exists, its schema will be compared against the schema of all specified
MonodbDB collection.
- *
- * <p>This action supports a limited number of schema changes. Unsupported
schema changes will be
- * ignored. Currently supported schema changes includes:
+ * <p>This action is responsible for:
*
* <ul>
- * <li>Adding columns.
+ * <li>Validating the provided MongoDB configuration.
+ * <li>Checking and ensuring the existence of the target database and table.
+ * <li>Setting up the necessary Flink streaming environment for data
synchronization.
+ * <li>Handling case sensitivity considerations for database and table names.
* </ul>
+ *
+ * <p>Usage:
+ *
+ * <pre>
+ * MongoDBSyncTableAction action = new MongoDBSyncTableAction(...);
+ * action.run();
+ * </pre>
*/
public class MongoDBSyncTableAction extends ActionBase {
public final Configuration mongodbConfig;
@@ -106,18 +106,21 @@ public class MongoDBSyncTableAction extends ActionBase {
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
- EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
- RichCdcMultiplexRecordEventParser::new;
- Schema fromMongodb =
- MongoDBActionUtils.buildPaimonSchema(
- mongodbSchema, partitionKeys, tableConfig,
caseSensitive);
- try {
+
+ // Check if table exists before trying to get or create it
+ if (catalog.tableExists(identifier)) {
table = (FileStoreTable) catalog.getTable(identifier);
- } catch (Exception e) {
+ } else {
+ Schema fromMongodb =
+ MongoDBActionUtils.buildPaimonSchema(
+ mongodbSchema, partitionKeys, tableConfig,
caseSensitive);
catalog.createTable(identifier, fromMongodb, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
+ EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
+ RichCdcMultiplexRecordEventParser::new;
+
CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
index 80cde22df..8c70ba4c1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -30,6 +31,7 @@ import org.bson.Document;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -37,12 +39,22 @@ import java.util.Objects;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
-import static
org.apache.paimon.shade.guava30.com.google.common.collect.Lists.newArrayList;
-import static org.apache.paimon.types.DataTypes.STRING;
-/** mongodb schema. */
+/**
+ * Represents the schema of a MongoDB collection.
+ *
+ * <p>This class provides methods to retrieve and manage the schema details of
a MongoDB collection,
+ * including the database name, table (collection) name, fields, and primary
keys. The schema can be
+ * acquired in two modes: SPECIFIED and DYNAMIC. In the SPECIFIED mode, the
schema details are
+ * provided explicitly, while in the DYNAMIC mode, the schema is inferred from
the first document in
+ * the collection.
+ *
+ * <p>The class also provides utility methods to generate schema fields and
create a new MongoDB
+ * schema instance.
+ */
public class MongodbSchema {
+ private static final String ID_FIELD = "_id";
private final String databaseName;
private final String tableName;
private final Map<String, DataType> fields;
@@ -76,55 +88,60 @@ public class MongodbSchema {
}
public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
-
- ModeEnum mode =
ModeEnum.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
- String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
- String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
- String collectionName =
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+ SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
switch (mode) {
case SPECIFIED:
- String[] columnNames =
mongodbConfig.get(FIELD_NAME).split(",");
- Map<String, DataType> schemaFields =
- generateSchemaFields(Arrays.asList(columnNames));
- return new MongodbSchema(
- databaseName, collectionName, schemaFields,
newArrayList("_id"));
+ return createSchemaFromSpecifiedConfig(mongodbConfig);
case DYNAMIC:
- String url = String.format("mongodb://%s/%s", hosts,
databaseName);
- try (MongoClient mongoClient = MongoClients.create(url)) {
- MongoDatabase database =
mongoClient.getDatabase(databaseName);
- MongoCollection<Document> collection =
database.getCollection(collectionName);
- Document firstDocument = collection.find().first();
- return createMongodbSchema(
- databaseName, collectionName,
getColumnNames(firstDocument));
- }
+ return createSchemaFromDynamicConfig(mongodbConfig);
default:
- throw new RuntimeException();
+ throw new IllegalArgumentException("Unsupported schema
acquisition mode: " + mode);
}
}
- private static List<String> getColumnNames(Document document) {
- if (document != null) {
- return new ArrayList<>(document.keySet());
+ private static SchemaAcquisitionMode getModeFromConfig(Configuration
mongodbConfig) {
+ return
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
+ }
+
+ private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration
mongodbConfig) {
+ String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
+ Map<String, DataType> schemaFields =
generateSchemaFields(Arrays.asList(columnNames));
+ String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
+ String collectionName =
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+ return new MongodbSchema(
+ databaseName, collectionName, schemaFields,
Collections.singletonList(ID_FIELD));
+ }
+
+ private static MongodbSchema createSchemaFromDynamicConfig(Configuration
mongodbConfig) {
+ String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
+ String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
+ String collectionName =
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
+ String url = String.format("mongodb://%s/%s", hosts, databaseName);
+ try (MongoClient mongoClient = MongoClients.create(url)) {
+ MongoDatabase database = mongoClient.getDatabase(databaseName);
+ MongoCollection<Document> collection =
database.getCollection(collectionName);
+ Document firstDocument = collection.find().first();
+ return createMongodbSchema(databaseName, collectionName,
getColumnNames(firstDocument));
}
- return null;
+ }
+
+ private static List<String> getColumnNames(Document document) {
+ return document != null ? new ArrayList<>(document.keySet()) :
Collections.emptyList();
}
private static Map<String, DataType> generateSchemaFields(List<String>
columnNames) {
Map<String, DataType> schemaFields = new LinkedHashMap<>();
-
- if (columnNames != null) {
- for (String columnName : columnNames) {
- schemaFields.put(columnName, STRING());
- }
+ for (String columnName : columnNames) {
+ schemaFields.put(columnName, DataTypes.STRING());
}
-
return schemaFields;
}
private static MongodbSchema createMongodbSchema(
String databaseName, String collectionName, List<String>
columnNames) {
Map<String, DataType> schemaFields = generateSchemaFields(columnNames);
- return new MongodbSchema(databaseName, collectionName, schemaFields,
newArrayList("_id"));
+ return new MongodbSchema(
+ databaseName, collectionName, schemaFields,
Collections.singletonList(ID_FIELD));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/SchemaAcquisitionMode.java
similarity index 77%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/SchemaAcquisitionMode.java
index 926999fb1..195eaa962 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/ModeEnum.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/SchemaAcquisitionMode.java
@@ -18,8 +18,12 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-/** schema acquisition mode. */
-public enum ModeEnum {
+/** Enum representing the modes of schema acquisition. */
+public enum SchemaAcquisitionMode {
+
+ /** Represents a mode where the schema is specified explicitly. */
SPECIFIED,
+
+ /** Represents a mode where the schema is determined dynamically based on
the data. */
DYNAMIC
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 1304ade16..22887109a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -33,7 +33,10 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-/** extract record implementation class 6.x>Mongodb Version>4.x. */
+/**
+ * Implementation class for extracting records from MongoDB versions greater
than 4.x and less than
+ * 6.x.
+ */
public class Mongo4VersionStrategy implements MongoVersionStrategy {
private static final String FIELD_TYPE = "operationType";
@@ -58,42 +61,87 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
this.mongodbConfig = mongodbConfig;
}
+ /**
+ * Extracts records from the provided JsonNode based on the MongoDB
version strategy.
+ *
+ * @param root The root JsonNode containing the MongoDB record.
+ * @return A list of RichCdcMultiplexRecord extracted from the root node.
+ * @throws JsonProcessingException If there's an error during JSON
processing.
+ */
@Override
public List<RichCdcMultiplexRecord> extractRecords(JsonNode root)
throws JsonProcessingException {
+ String op = root.get(FIELD_TYPE).asText();
+ JsonNode fullDocument = root.get(FIELD_DATA);
+ return handleOperation(op, fullDocument);
+ }
+
+ /**
+ * Handles the MongoDB operation type and processes the document
accordingly.
+ *
+ * @param op The operation type (e.g., insert, update, replace).
+ * @param fullDocument The JsonNode representing the full MongoDB document.
+ * @return A list of RichCdcMultiplexRecord based on the operation type.
+ * @throws JsonProcessingException If there's an error during JSON
processing.
+ */
+ private List<RichCdcMultiplexRecord> handleOperation(String op, JsonNode
fullDocument)
+ throws JsonProcessingException {
List<RichCdcMultiplexRecord> records = new ArrayList<>();
LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
- String op = root.get(FIELD_TYPE).asText();
- JsonNode fullDocument = root.get(FIELD_DATA);
- // extract row kind and field values
switch (op) {
case OP_INSERT:
- Map<String, String> insert =
- getExtractRow(fullDocument, paimonFieldTypes,
caseSensitive, mongodbConfig);
- records.add(
- new RichCdcMultiplexRecord(
- databaseName,
- collection,
- paimonFieldTypes,
- extractPrimaryKeys(),
- new CdcRecord(RowKind.INSERT, insert)));
+ records.add(handleInsert(fullDocument, paimonFieldTypes));
break;
case OP_REPLACE:
case OP_UPDATE:
- Map<String, String> after =
- getExtractRow(fullDocument, paimonFieldTypes,
caseSensitive, mongodbConfig);
- records.add(
- new RichCdcMultiplexRecord(
- databaseName,
- collection,
- paimonFieldTypes,
- extractPrimaryKeys(),
- new CdcRecord(RowKind.UPDATE_AFTER, after)));
+ records.add(handleUpdateOrReplace(fullDocument,
paimonFieldTypes));
break;
default:
throw new UnsupportedOperationException("Unknown record type:
" + op);
}
return records;
}
+
+ /**
+ * Processes the insert operation and constructs a RichCdcMultiplexRecord.
+ *
+ * @param fullDocument The JsonNode representing the full MongoDB document
for insertion.
+ * @param paimonFieldTypes A map to store the field types.
+ * @return A RichCdcMultiplexRecord representing the insert operation.
+ * @throws JsonProcessingException If there's an error during JSON
processing.
+ */
+ private RichCdcMultiplexRecord handleInsert(
+ JsonNode fullDocument, LinkedHashMap<String, DataType>
paimonFieldTypes)
+ throws JsonProcessingException {
+ Map<String, String> insert =
+ getExtractRow(fullDocument, paimonFieldTypes, caseSensitive,
mongodbConfig);
+ return new RichCdcMultiplexRecord(
+ databaseName,
+ collection,
+ paimonFieldTypes,
+ extractPrimaryKeys(),
+ new CdcRecord(RowKind.INSERT, insert));
+ }
+
+ /**
+ * Processes the update or replace operation and constructs a
RichCdcMultiplexRecord.
+ *
+ * @param fullDocument The JsonNode representing the full MongoDB document
for update/replace.
+ * @param paimonFieldTypes A map to store the field types.
+ * @return A RichCdcMultiplexRecord representing the update or replace
operation.
+ * @throws JsonProcessingException If there's an error during JSON
processing.
+ */
+ private RichCdcMultiplexRecord handleUpdateOrReplace(
+ JsonNode fullDocument, LinkedHashMap<String, DataType>
paimonFieldTypes)
+ throws JsonProcessingException {
+ Map<String, String> after =
+ getExtractRow(fullDocument, paimonFieldTypes, caseSensitive,
mongodbConfig);
+ return new RichCdcMultiplexRecord(
+ databaseName,
+ collection,
+ paimonFieldTypes,
+ extractPrimaryKeys(),
+ new CdcRecord(RowKind.UPDATE_AFTER, after));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 6d19ed6e4..ffda81304 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -18,46 +18,57 @@
package org.apache.paimon.flink.action.cdc.mongodb.strategy;
-import org.apache.paimon.flink.action.cdc.mongodb.JsonParserUtils;
-import org.apache.paimon.flink.action.cdc.mongodb.ModeEnum;
+import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import com.jayway.jsonpath.JsonPath;
import org.apache.flink.configuration.Configuration;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Optional;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.PARSER_PATH;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Processing strategies for different mongodb versions. */
+/** Interface for processing strategies tailored for different MongoDB
versions. */
public interface MongoVersionStrategy {
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ /**
+ * Extracts records from the provided JsonNode.
+ *
+ * @param root The root JsonNode containing the MongoDB record.
+ * @return A list of RichCdcMultiplexRecord extracted from the root node.
+ * @throws JsonProcessingException If there's an error during JSON
processing.
+ */
List<RichCdcMultiplexRecord> extractRecords(JsonNode root) throws
JsonProcessingException;
+ /**
+ * Extracts primary keys from the MongoDB record.
+ *
+ * @return A list of primary keys.
+ */
default List<String> extractPrimaryKeys() {
- List<String> primaryKeys = new ArrayList<>();
- primaryKeys.add("_id");
- return primaryKeys;
+ return Collections.singletonList("_id");
}
default Map<String, String> extractRow(String record) {
- return JsonParserUtils.extractMap(record);
+ return JsonSerdeUtil.parseJsonMap(record, String.class);
}
default Map<String, String> keyCaseInsensitive(Map<String, String> origin)
{
@@ -73,19 +84,30 @@ public interface MongoVersionStrategy {
return keyCaseInsensitive;
}
+ /**
+ * Determines the extraction mode and retrieves the row accordingly.
+ *
+ * @param jsonNode The JsonNode representing the MongoDB document.
+ * @param paimonFieldTypes A map to store the field types.
+ * @param caseSensitive Flag indicating if the extraction should be
case-sensitive.
+ * @param mongodbConfig Configuration for the MongoDB connection.
+ * @return A map representing the extracted row.
+ * @throws JsonProcessingException If there's an error during JSON
processing.
+ */
default Map<String, String> getExtractRow(
JsonNode jsonNode,
LinkedHashMap<String, DataType> paimonFieldTypes,
boolean caseSensitive,
Configuration mongodbConfig)
throws JsonProcessingException {
- ModeEnum mode =
ModeEnum.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
+ SchemaAcquisitionMode mode =
+
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
ObjectNode objectNode = (ObjectNode)
OBJECT_MAPPER.readTree(jsonNode.asText());
JsonNode document = objectNode.set("_id",
objectNode.get("_id").get("$oid"));
switch (mode) {
case SPECIFIED:
Map<String, String> specifiedRow =
- getSpecifiedRow(
+ parseFieldsFromJsonRecord(
document.toString(),
mongodbConfig.getString(PARSER_PATH),
mongodbConfig.getString(FIELD_NAME),
@@ -93,40 +115,51 @@ public interface MongoVersionStrategy {
return caseSensitive ? specifiedRow :
keyCaseInsensitive(specifiedRow);
case DYNAMIC:
Map<String, String> dynamicRow =
- getDynamicRow(document.toString(), paimonFieldTypes);
+ parseAndTypeJsonRow(document.toString(),
paimonFieldTypes);
return caseSensitive ? dynamicRow :
keyCaseInsensitive(dynamicRow);
default:
- throw new RuntimeException();
+ throw new RuntimeException("Unsupported extraction mode: " +
mode);
}
}
- default Map<String, String> getDynamicRow(
+ /**
+ * Parses a JSON string into a map and updates the data type mapping for
each key.
+ *
+ * @param evaluate The JSON string to be parsed.
+ * @param paimonFieldTypes A map to store the data types of the keys.
+ * @return A map containing the parsed key-value pairs from the JSON
string.
+ */
+ default Map<String, String> parseAndTypeJsonRow(
String evaluate, LinkedHashMap<String, DataType> paimonFieldTypes)
{
- Map<String, String> linkedHashMap =
JsonParserUtils.extractMap(evaluate);
- Set<String> keySet = linkedHashMap.keySet();
- String[] columns = keySet.toArray(new String[0]);
- for (String column : columns) {
+ Map<String, String> parsedMap = JsonSerdeUtil.parseJsonMap(evaluate,
String.class);
+ for (String column : parsedMap.keySet()) {
paimonFieldTypes.put(column, DataTypes.STRING());
}
return extractRow(evaluate);
}
- static Map<String, String> getSpecifiedRow(
+ /**
+ * Parses specified fields from a JSON record.
+ *
+ * @param record The JSON record to be parsed.
+ * @param fieldPaths The paths of the fields to be parsed from the JSON
record.
+ * @param fieldNames The names of the fields to be returned in the result
map.
+ * @param paimonFieldTypes A map to store the data types of the fields.
+ * @return A map containing the parsed fields and their values.
+ */
+ static Map<String, String> parseFieldsFromJsonRecord(
String record,
- String parsePath,
- String fileName,
+ String fieldPaths,
+ String fieldNames,
LinkedHashMap<String, DataType> paimonFieldTypes) {
Map<String, String> resultMap = new HashMap<>();
- String[] columnNames = fileName.split(",");
- String[] parseNames = parsePath.split(",");
+ String[] columnNames = fieldNames.split(",");
+ String[] parseNames = fieldPaths.split(",");
+
for (int i = 0; i < parseNames.length; i++) {
paimonFieldTypes.put(columnNames[i], DataTypes.STRING());
- String evaluate = JsonParserUtils.evaluate(record, "$." +
parseNames[i]);
- if (evaluate == null) {
- resultMap.put(columnNames[i], "{}");
- } else {
- resultMap.put(columnNames[i], evaluate);
- }
+ String evaluate = JsonPath.read(record, parseNames[i]);
+ resultMap.put(columnNames[i],
Optional.ofNullable(evaluate).orElse("{}"));
}
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
deleted file mode 100644
index d1db139e8..000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/JsonParserUtilsTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.mongodb;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-/** UT cases for {@link JsonParserUtils}. */
-public class JsonParserUtilsTest {
-
- @Test
- public void testGetJsonObject() {
- String json =
- "{ \"kind\": \"youtube#videoListResponse\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\",
\"pageInfo\": { \"totalResults\": 1, \"resultsPerPage\": 1 }, \"items\": [ {
\"kind\": \"youtube#video\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\":
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\":
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\":
\"29\" [...]
- String output = JsonParserUtils.evaluate(json,
"$.pageInfo.totalResults");
- assertEquals("1", output);
- }
-
- @Test
- public void testNestedGetJsonObject() {
- String json =
- "{ \"kind\": \"youtube#videoListResponse\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\",
\"pageInfo\": { \"pagehit\":{ \"kind\": \"youtube#video\" } ,\"totalResults\":
1, \"resultsPerPage\": 1 }, \"items\": [ { \"kind\": \"youtube#video\",
\"etag\": \"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\",
\"id\": \"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\",
\"likeCount\": \"79\", \"dislikeCount\": \"11\", \"favo [...]
- String output = JsonParserUtils.evaluate(json, "$.pageInfo.pagehit");
- assertEquals("{\"kind\":\"youtube#video\"}", output);
- }
-
- @Test
- public void testStringWhenNotJson() {
- String json =
- "{ \"kind\": \"youtube#videoListResponse\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\",
\"pageInfo\": \"page\", \"items\": [ { \"kind\": \"youtube#video\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\":
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\":
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\":
\"29\" }, \"topicDetails\": { \"topicIds\": [ [...]
- String output = JsonParserUtils.evaluate(json,
"$.pageinfo.test_field");
- assertNull(output);
- }
-
- @Test
- public void testStringWhenJson() {
- String json =
- "{ \"kind\": \"youtube#videoListResponse\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"\",
\"pageInfo\": \"page\", \"items\": [ { \"kind\": \"youtube#video\", \"etag\":
\"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"\", \"id\":
\"wHkPb68dxEw\", \"statistics\": { \"viewCount\": \"9211\", \"likeCount\":
\"79\", \"dislikeCount\": \"11\", \"favoriteCount\": \"0\", \"commentCount\":
\"29\" }, \"topicDetails\": { \"topicIds\": [ [...]
- String output = JsonParserUtils.evaluate(json, "$.pageInfo");
- assertEquals("page", output);
- }
-
- @Test
- public void testJsonArray() {
- String json =
- "{\"country\":\"Switzerland\",\"languages\": [ \"Italian\"
],\"religions\": \"christian\"}";
- String output = JsonParserUtils.evaluate(json, "$.languages[0]");
- assertEquals("Italian", output);
- }
-
- @Test
- public void testJsonMap() {
- String json =
-
"{\"country\":\"Switzerland\",\"languages\":\"Italian\",\"religions\": { \"f\":
\"v\", \"n\":null} }";
- String output = JsonParserUtils.evaluate(json, "$.religions.f");
- assertEquals("v", output);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
index 10eaf9d3f..969a5fc25 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
@@ -48,10 +48,6 @@ public class MongoDBContainer extends
org.testcontainers.containers.MongoDBConta
public static final int MONGODB_PORT = 27017;
- public static final String FLINK_USER = "flinkuser";
-
- public static final String FLINK_USER_PASSWORD =
"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
-
public MongoDBContainer(String imageName) {
super(imageName);
}
@@ -142,12 +138,6 @@ public class MongoDBContainer extends
org.testcontainers.containers.MongoDBConta
}
}
- /** Executes a mongo command in separate database. */
- public String executeCommandInSeparateDatabase(String command, String
baseName) {
- return executeCommandInDatabase(
- command, baseName + "_" + Integer.toUnsignedString(new
Random().nextInt(), 36));
- }
-
/** Executes a mongo command file in separate database. */
public String executeCommandFileInSeparateDatabase(
String fileNameIgnoreSuffix, String content) {
@@ -171,7 +161,6 @@ public class MongoDBContainer extends
org.testcontainers.containers.MongoDBConta
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
- // use database;
String command0 = String.format("db = db.getSiblingDB('%s');\n",
dbName);
String command1 =
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 4c5b822cf..ad213b5b2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -36,7 +36,7 @@ import java.util.Map;
public class MongoDBSyncDatabaseActionITCase extends MongoDBActionITCaseBase {
@Test
- @Timeout(120)
+ @Timeout(60)
public void testSchemaEvolution() throws Exception {
writeRecordsToMongoDB("test-data-1", database, "database");
writeRecordsToMongoDB("test-data-2", database, "database");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 87e5906e7..e62f1c815 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -135,7 +135,7 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
mongodbConfig.put("database", inventory);
mongodbConfig.put("collection", "products");
mongodbConfig.put("field.name", "_id,name,description");
- mongodbConfig.put("parser.path", "_id,name,description");
+ mongodbConfig.put("parser.path", "$._id,$.name,$.description");
mongodbConfig.put("schema.start.mode", "specified");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);