This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3883dc3e2 [flink] Mongodb CDC Ingestion support computed column (#1924)
3883dc3e2 is described below

commit 3883dc3e2b8abfb890239fba8e89756fdbdb2c8c
Author: monster <[email protected]>
AuthorDate: Thu Sep 7 14:57:38 2023 +0800

    [flink] Mongodb CDC Ingestion support computed column (#1924)
---
 docs/content/how-to/cdc-ingestion.md               |  2 +
 docs/content/maintenance/manage-tags.md            |  1 +
 .../shortcodes/generated/mongodb_operator.html     |  4 +-
 .../shortcodes/generated/mongodb_path_example.html |  4 +-
 .../generated/mongodb_sync_database.html           |  2 +-
 .../shortcodes/generated/mongodb_sync_table.html   |  6 +-
 .../action/cdc/mongodb/MongoDBActionUtils.java     |  4 +-
 .../action/cdc/mongodb/MongoDBRecordParser.java    | 20 +++--
 .../action/cdc/mongodb/MongoDBSyncTableAction.java | 94 ++++++++++++++++++----
 .../cdc/mongodb/MongoDBSyncTableActionFactory.java | 45 +++++------
 .../mongodb/strategy/Mongo4VersionStrategy.java    | 18 ++++-
 .../cdc/mongodb/strategy/MongoVersionStrategy.java | 72 +++++++++++------
 .../cdc/mongodb/MongoDBActionITCaseBase.java       | 14 +---
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  | 27 +++++++
 .../mongodb/table/computedcolumn/test-table-1.js   | 19 +++++
 15 files changed, 241 insertions(+), 91 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index ea3a33fc1..217870728 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -447,6 +447,7 @@ To use this feature through `flink run`, run the following 
shell command.
     --database <database-name> \
     --table <table-name> \
     [--partition-keys <partition-keys>] \
+    [--computed-column <'column-name=expr-name(args[, ...])'> 
[--computed-column ...]] \
     [--mongodb-conf <mongodb-cdc-source-conf> [--mongodb-conf 
<mongodb-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf 
<paimon-table-sink-conf> ...]]
@@ -535,6 +536,7 @@ Example 1: synchronize collection into one Paimon table
     --database test_db \
     --table test_table \
     --partition-keys pt \
+    --computed-column '_year=year(age)' \
     --mongodb-conf hosts=127.0.0.1:27017 \
     --mongodb-conf username=root \
     --mongodb-conf password=123456 \
diff --git a/docs/content/maintenance/manage-tags.md 
b/docs/content/maintenance/manage-tags.md
index 1bb97535a..cfd9422ca 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -176,6 +176,7 @@ public class DeleteTag {
 
 {{< /tab >}}
 
+
 {{< tab "Spark" >}}
 Run the following sql:
 ```sql
diff --git a/docs/layouts/shortcodes/generated/mongodb_operator.html 
b/docs/layouts/shortcodes/generated/mongodb_operator.html
index 2f4f99197..aa62d2f32 100644
--- a/docs/layouts/shortcodes/generated/mongodb_operator.html
+++ b/docs/layouts/shortcodes/generated/mongodb_operator.html
@@ -20,8 +20,8 @@ under the License.
 <table class="configuration table table-bordered">
     <thead>
     <tr>
-        <th class="text-left" style="width: 15%">Operator</th>
-        <th class="text-left" style="width: 85%">Description</th>
+        <th class="text-left" style="width: 20%">Operator</th>
+        <th class="text-left" style="width: 80%">Description</th>
     </tr>
     </thead>
     <tbody>
diff --git a/docs/layouts/shortcodes/generated/mongodb_path_example.html 
b/docs/layouts/shortcodes/generated/mongodb_path_example.html
index fb1c6a874..6f5922578 100644
--- a/docs/layouts/shortcodes/generated/mongodb_path_example.html
+++ b/docs/layouts/shortcodes/generated/mongodb_path_example.html
@@ -20,8 +20,8 @@ under the License.
 <table class="configuration table table-bordered">
     <thead>
     <tr>
-        <th class="text-left" style="width: 15%">JsonPath</th>
-        <th class="text-left" style="width: 85%">Result</th>
+        <th class="text-left" style="width: 45%">JsonPath</th>
+        <th class="text-left" style="width: 55%">Result</th>
     </tr>
     </thead>
     <tbody>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html 
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
index a7c9ae94f..235912def 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -51,7 +51,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--mongodb-conf</h5></td>
-        <td>The configuration for Flink CDC MongoDB sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
+        <td>The configuration for Flink CDC MongoDB sources. Each 
configuration should be specified in the format "key=value". hosts, username, 
password, database are required configurations, others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog-conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_table.html 
b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
index 440580eb7..384bd6e35 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
@@ -41,9 +41,13 @@ under the License.
         <td><h5>--partition-keys</h5></td>
         <td>The partition keys for Paimon table. If there are multiple 
partition keys, connect them with comma, for example "dt,hh,mm".</td>
     </tr>
+    <tr>
+        <td><h5>--computed-column</h5></td>
+        <td>The definitions of computed columns. The argument field is from 
MongoDB collection field name. See <a href="#computed-functions">here</a> for a 
complete list of configurations. </td>
+    </tr>
     <tr>
         <td><h5>--mongodb-conf</h5></td>
-        <td>The configuration for Flink CDC MongoDB sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
+        <td>The configuration for Flink CDC MongoDB sources. Each 
configuration should be specified in the format "key=value". hosts, username, 
password, database and collection are required configurations, others are 
optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog-conf</h5></td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index 4b18bc832..fefc4a687 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 
@@ -161,6 +162,7 @@ public class MongoDBActionUtils {
     static Schema buildPaimonSchema(
             MongodbSchema mongodbSchema,
             List<String> specifiedPartitionKeys,
+            List<ComputedColumn> computedColumns,
             Map<String, String> tableConfig,
             boolean caseSensitive) {
         LinkedHashMap<String, DataType> sourceColumns =
@@ -172,7 +174,7 @@ public class MongoDBActionUtils {
         return CdcActionCommonUtils.buildPaimonSchema(
                 specifiedPartitionKeys,
                 Lists.newArrayList(PRIMARY_KEY),
-                Collections.emptyList(),
+                computedColumns,
                 tableConfig,
                 sourceColumns,
                 null,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index a8aad81af..3f2c33f8e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
 import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
@@ -30,6 +31,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * A parser for MongoDB Debezium JSON strings, converting them into a list of 
{@link
  * RichCdcMultiplexRecord}s.
@@ -55,22 +59,27 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
     private static final String FIELD_TABLE = "coll";
     private static final String FIELD_NAMESPACE = "ns";
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
+    private final List<ComputedColumn> computedColumns;
     private final boolean caseSensitive;
     private final TableNameConverter tableNameConverter;
     private final Configuration mongodbConfig;
     private JsonNode root;
 
-    public MongoDBRecordParser(boolean caseSensitive, Configuration 
mongodbConfig) {
-        this(caseSensitive, new TableNameConverter(caseSensitive), 
mongodbConfig);
+    public MongoDBRecordParser(
+            boolean caseSensitive,
+            TableNameConverter tableNameConverter,
+            Configuration mongodbConfig) {
+        this(caseSensitive, tableNameConverter, Collections.emptyList(), 
mongodbConfig);
     }
 
     public MongoDBRecordParser(
             boolean caseSensitive,
             TableNameConverter tableNameConverter,
+            List<ComputedColumn> computedColumns,
             Configuration mongodbConfig) {
         this.caseSensitive = caseSensitive;
         this.tableNameConverter = tableNameConverter;
+        this.computedColumns = computedColumns;
         this.mongodbConfig = mongodbConfig;
     }
 
@@ -81,7 +90,7 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
         String collection = 
tableNameConverter.convert(extractString(FIELD_TABLE));
         MongoVersionStrategy versionStrategy =
                 VersionStrategyFactory.create(
-                        databaseName, collection, caseSensitive, 
mongodbConfig);
+                        databaseName, collection, caseSensitive, 
computedColumns, mongodbConfig);
         versionStrategy.extractRecords(root).forEach(out::collect);
     }
 
@@ -94,13 +103,14 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
                 String databaseName,
                 String collection,
                 boolean caseSensitive,
+                List<ComputedColumn> computedColumns,
                 Configuration mongodbConfig) {
             // TODO: When MongoDB CDC is upgraded to 2.5, uncomment the 
version check logic
             // if (mongodbVersion >= 6) {
             //     return new Mongo6VersionStrategy(databaseName, collection, 
caseSensitive);
             // }
             return new Mongo4VersionStrategy(
-                    databaseName, collection, caseSensitive, mongodbConfig);
+                    databaseName, collection, caseSensitive, computedColumns, 
mongodbConfig);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 26940ac83..336fb0fc1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -35,9 +37,13 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -66,21 +72,68 @@ public class MongoDBSyncTableAction extends ActionBase {
     private final String table;
     private final List<String> partitionKeys;
     private final Map<String, String> tableConfig;
+    private final List<String> computedColumnArgs;
+
+    private MongoDBSyncTableAction(Builder builder) {
+        super(builder.warehouse, builder.catalogConfig);
+        this.mongodbConfig = builder.mongodbConfig;
+        this.database = builder.database;
+        this.table = builder.table;
+        this.partitionKeys = builder.partitionKeys;
+        this.tableConfig = builder.tableConfig;
+        this.computedColumnArgs = builder.computedColumnArgs;
+    }
+
+    /**
+     * Builder class for constructing MongoDBSyncTableAction instances. This 
class follows the
+     * Builder design pattern, allowing for a more readable and maintainable 
way to set up complex
+     * objects.
+     */
+    public static class Builder {
+        private final String warehouse;
+        private final String database;
+        private final String table;
+        private final Configuration mongodbConfig;
+        private final Map<String, String> catalogConfig;
+        private List<String> partitionKeys = new ArrayList<>();
+        private Map<String, String> tableConfig = new HashMap<>();
+        private List<String> computedColumnArgs = new ArrayList<>();
+
+        public Builder(
+                String warehouse,
+                String database,
+                String table,
+                Map<String, String> catalogConfig,
+                Map<String, String> mongodbConfig) {
+            this.warehouse = warehouse;
+            this.database = database;
+            this.table = table;
+            this.catalogConfig = catalogConfig;
+            this.mongodbConfig = Configuration.fromMap(mongodbConfig);
+        }
+
+        public Builder withPartitionKeys(List<String> partitionKeys) {
+            this.partitionKeys = partitionKeys;
+            return this;
+        }
+
+        public Builder withPartitionKeys(String... partitionKeys) {
+            return withPartitionKeys(Arrays.asList(partitionKeys));
+        }
+
+        public Builder withTableConfig(Map<String, String> tableConfig) {
+            this.tableConfig = tableConfig;
+            return this;
+        }
 
-    public MongoDBSyncTableAction(
-            Map<String, String> mongodbConfig,
-            String warehouse,
-            String database,
-            String table,
-            List<String> partitionKeys,
-            Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
-        super(warehouse, catalogConfig);
-        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
-        this.database = database;
-        this.table = table;
-        this.partitionKeys = partitionKeys;
-        this.tableConfig = tableConfig;
+        public Builder withComputedColumnArgs(List<String> computedColumnArgs) 
{
+            this.computedColumnArgs = computedColumnArgs;
+            return this;
+        }
+
+        public MongoDBSyncTableAction buildAction() {
+            return new MongoDBSyncTableAction(this);
+        }
     }
 
     @Override
@@ -106,6 +159,8 @@ public class MongoDBSyncTableAction extends ActionBase {
 
         MongodbSchema mongodbSchema = 
MongodbSchema.getMongodbSchema(mongodbConfig);
         catalog.createDatabase(database, true);
+        List<ComputedColumn> computedColumns =
+                buildComputedColumns(computedColumnArgs, 
mongodbSchema.fields());
 
         Identifier identifier = new Identifier(database, table);
         FileStoreTable table;
@@ -116,7 +171,11 @@ public class MongoDBSyncTableAction extends ActionBase {
         } else {
             Schema fromMongodb =
                     MongoDBActionUtils.buildPaimonSchema(
-                            mongodbSchema, partitionKeys, tableConfig, 
caseSensitive);
+                            mongodbSchema,
+                            partitionKeys,
+                            computedColumns,
+                            tableConfig,
+                            caseSensitive);
             catalog.createTable(identifier, fromMongodb, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
@@ -133,7 +192,10 @@ public class MongoDBSyncTableAction extends ActionBase {
                                                 "MongoDB Source")
                                         .flatMap(
                                                 new MongoDBRecordParser(
-                                                        caseSensitive, 
mongodbConfig)))
+                                                        caseSensitive,
+                                                        new 
TableNameConverter(caseSensitive),
+                                                        computedColumns,
+                                                        mongodbConfig)))
                         .withParserFactory(parserFactory)
                         .withTable(table)
                         .withIdentifier(identifier)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
index 96249895a..b9612a7df 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -24,12 +24,8 @@ import org.apache.paimon.flink.action.ActionFactory;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /** Factory to create {@link MongoDBSyncTableAction}. */
 public class MongoDBSyncTableActionFactory implements ActionFactory {
@@ -44,29 +40,26 @@ public class MongoDBSyncTableActionFactory implements 
ActionFactory {
     @Override
     public Optional<Action> create(MultipleParameterTool params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
+        checkRequiredArgument(params, "mongodb-conf");
+
+        MongoDBSyncTableAction.Builder builder =
+                new MongoDBSyncTableAction.Builder(
+                                tablePath.f0,
+                                tablePath.f1,
+                                tablePath.f2,
+                                optionalConfigMap(params, "catalog-conf"),
+                                optionalConfigMap(params, "mongodb-conf"))
+                        .withTableConfig(optionalConfigMap(params, 
"table-conf"));
 
-        List<String> partitionKeys = Collections.emptyList();
         if (params.has("partition-keys")) {
-            partitionKeys =
-                    Arrays.stream(params.get("partition-keys").split(","))
-                            .collect(Collectors.toList());
+            builder.withPartitionKeys(params.get("partition-keys").split(","));
         }
 
-        checkRequiredArgument(params, "mongodb-conf");
-
-        Map<String, String> mongodbConfig = optionalConfigMap(params, 
"mongodb-conf");
-        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
-        Map<String, String> tableConfig = optionalConfigMap(params, 
"table-conf");
-
-        return Optional.of(
-                new MongoDBSyncTableAction(
-                        mongodbConfig,
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        partitionKeys,
-                        catalogConfig,
-                        tableConfig));
+        if (params.has("computed-column")) {
+            builder.withComputedColumnArgs(
+                    new 
ArrayList<>(params.getMultiParameter("computed-column")));
+        }
+        return Optional.ofNullable(builder.buildAction());
     }
 
     @Override
@@ -81,6 +74,7 @@ public class MongoDBSyncTableActionFactory implements 
ActionFactory {
                 "  mongodb-sync-table --warehouse <warehouse-path> --database 
<database-name> "
                         + "--table <table-name> "
                         + "[--partition-keys <partition-keys>] "
+                        + "[--computed-column <'column-name=expr-name(args[, 
...])'> [--computed-column ...]] "
                         + "[--mongodb-conf <mongodb-cdc-source-conf> 
[--mongodb-conf <mongodb-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> 
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -93,6 +87,9 @@ public class MongoDBSyncTableActionFactory implements 
ActionFactory {
                         + "this action will automatically create an 
unpartitioned Paimon table.");
         System.out.println();
 
+        System.out.println("Please see doc for usage of --computed-column.");
+        System.out.println();
+
         System.out.println("mongodb CDC source conf syntax:");
         System.out.println("  key=value");
         System.out.println(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 22887109a..028af46f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
@@ -49,16 +50,19 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
     private final String collection;
     private final boolean caseSensitive;
     private final Configuration mongodbConfig;
+    private final List<ComputedColumn> computedColumns;
 
     public Mongo4VersionStrategy(
             String databaseName,
             String collection,
             boolean caseSensitive,
+            List<ComputedColumn> computedColumns,
             Configuration mongodbConfig) {
         this.databaseName = databaseName;
         this.collection = collection;
         this.caseSensitive = caseSensitive;
         this.mongodbConfig = mongodbConfig;
+        this.computedColumns = computedColumns;
     }
 
     /**
@@ -115,7 +119,12 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
             JsonNode fullDocument, LinkedHashMap<String, DataType> 
paimonFieldTypes)
             throws JsonProcessingException {
         Map<String, String> insert =
-                getExtractRow(fullDocument, paimonFieldTypes, caseSensitive, 
mongodbConfig);
+                getExtractRow(
+                        fullDocument,
+                        paimonFieldTypes,
+                        caseSensitive,
+                        computedColumns,
+                        mongodbConfig);
         return new RichCdcMultiplexRecord(
                 databaseName,
                 collection,
@@ -136,7 +145,12 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
             JsonNode fullDocument, LinkedHashMap<String, DataType> 
paimonFieldTypes)
             throws JsonProcessingException {
         Map<String, String> after =
-                getExtractRow(fullDocument, paimonFieldTypes, caseSensitive, 
mongodbConfig);
+                getExtractRow(
+                        fullDocument,
+                        paimonFieldTypes,
+                        caseSensitive,
+                        computedColumns,
+                        mongodbConfig);
         return new RichCdcMultiplexRecord(
                 databaseName,
                 collection,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 8582ec348..1aa048fd1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
@@ -86,6 +87,7 @@ public interface MongoVersionStrategy {
             JsonNode jsonNode,
             LinkedHashMap<String, DataType> paimonFieldTypes,
             boolean caseSensitive,
+            List<ComputedColumn> computedColumns,
             Configuration mongodbConfig)
             throws JsonProcessingException {
         SchemaAcquisitionMode mode =
@@ -100,10 +102,16 @@ public interface MongoVersionStrategy {
                                 document.toString(),
                                 mongodbConfig.getString(PARSER_PATH),
                                 mongodbConfig.getString(FIELD_NAME),
+                                computedColumns,
                                 paimonFieldTypes);
                 break;
             case DYNAMIC:
-                row = parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes, caseSensitive);
+                row =
+                        parseAndTypeJsonRow(
+                                document.toString(),
+                                paimonFieldTypes,
+                                computedColumns,
+                                caseSensitive);
                 break;
             default:
                 throw new RuntimeException("Unsupported extraction mode: " + 
mode);
@@ -111,47 +119,59 @@ public interface MongoVersionStrategy {
         return mapKeyCaseConvert(row, caseSensitive, 
recordKeyDuplicateErrMsg(row));
     }
 
-    /**
-     * Parses a JSON string into a map and updates the data type mapping for 
each key.
-     *
-     * @param evaluate The JSON string to be parsed.
-     * @param paimonFieldTypes A map to store the data types of the keys.
-     * @return A map containing the parsed key-value pairs from the JSON 
string.
-     */
+    /** Parses and types a JSON row based on the given parameters. */
     default Map<String, String> parseAndTypeJsonRow(
             String evaluate,
             LinkedHashMap<String, DataType> paimonFieldTypes,
+            List<ComputedColumn> computedColumns,
             boolean caseSensitive) {
-        Map<String, String> parsedMap = JsonSerdeUtil.parseJsonMap(evaluate, 
String.class);
-        for (String column : parsedMap.keySet()) {
-            paimonFieldTypes.put(caseSensitive ? column : 
column.toLowerCase(), DataTypes.STRING());
-        }
-        return extractRow(evaluate);
+        Map<String, String> parsedRow = JsonSerdeUtil.parseJsonMap(evaluate, 
String.class);
+        return processParsedData(parsedRow, paimonFieldTypes, computedColumns, 
caseSensitive);
     }
 
-    /**
-     * Parses specified fields from a JSON record.
-     *
-     * @param record The JSON record to be parsed.
-     * @param fieldPaths The paths of the fields to be parsed from the JSON 
record.
-     * @param fieldNames The names of the fields to be returned in the result 
map.
-     * @param paimonFieldTypes A map to store the data types of the fields.
-     * @return A map containing the parsed fields and their values.
-     */
+    /** Parses fields from a JSON record based on the given parameters. */
     static Map<String, String> parseFieldsFromJsonRecord(
             String record,
             String fieldPaths,
             String fieldNames,
-            LinkedHashMap<String, DataType> paimonFieldTypes) {
-        Map<String, String> resultMap = new HashMap<>();
+            List<ComputedColumn> computedColumns,
+            LinkedHashMap<String, DataType> fieldTypes) {
         String[] columnNames = fieldNames.split(",");
         String[] parseNames = fieldPaths.split(",");
+        Map<String, String> parsedRow = new HashMap<>();
 
         for (int i = 0; i < parseNames.length; i++) {
-            paimonFieldTypes.put(columnNames[i], DataTypes.STRING());
             String evaluate = JsonPath.read(record, parseNames[i]);
-            resultMap.put(columnNames[i], 
Optional.ofNullable(evaluate).orElse("{}"));
+            parsedRow.put(columnNames[i], 
Optional.ofNullable(evaluate).orElse("{}"));
         }
+
+        return processParsedData(parsedRow, fieldTypes, computedColumns, 
false);
+    }
+
+    /** Processes the parsed data to generate the result map and update field 
types. */
+    static Map<String, String> processParsedData(
+            Map<String, String> parsedRow,
+            LinkedHashMap<String, DataType> fieldTypes,
+            List<ComputedColumn> computedColumns,
+            boolean caseSensitive) {
+        int initialCapacity = parsedRow.size() + computedColumns.size();
+        Map<String, String> resultMap = new HashMap<>(initialCapacity);
+
+        parsedRow.forEach(
+                (column, value) -> {
+                    String key = caseSensitive ? column : column.toLowerCase();
+                    fieldTypes.putIfAbsent(key, DataTypes.STRING());
+                    resultMap.put(key, value);
+                });
+        computedColumns.forEach(
+                computedColumn -> {
+                    String columnName = computedColumn.columnName();
+                    String fieldReference = computedColumn.fieldReference();
+                    String computedValue = 
computedColumn.eval(parsedRow.get(fieldReference));
+
+                    resultMap.put(columnName, computedValue);
+                    fieldTypes.put(columnName, computedColumn.columnType());
+                });
         return resultMap;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index 8976d6aa4..53e8d11eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -72,8 +72,8 @@ public abstract class MongoDBActionITCaseBase extends 
CdcActionITCaseBase {
         return 
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, content);
     }
 
-    protected static String writeRecordsToMongoDB(String fileName, String 
dbName, String content) {
-        return 
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, dbName, 
content);
+    protected static void writeRecordsToMongoDB(String fileName, String 
dbName, String content) {
+        MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, 
dbName, content);
     }
 
     protected MongoDBSyncTableActionBuilder syncTableActionBuilder(
@@ -98,15 +98,6 @@ public abstract class MongoDBActionITCaseBase extends 
CdcActionITCaseBase {
             throw new UnsupportedOperationException();
         }
 
-        public MongoDBSyncTableActionBuilder withComputedColumnArgs(String... 
computedColumnArgs) {
-            throw new UnsupportedOperationException();
-        }
-
-        public MongoDBSyncTableActionBuilder withComputedColumnArgs(
-                List<String> computedColumnArgs) {
-            throw new UnsupportedOperationException();
-        }
-
         public MongoDBSyncTableActionBuilder withTypeMappingModes(String... 
typeMappingModes) {
             throw new UnsupportedOperationException();
         }
@@ -125,6 +116,7 @@ public abstract class MongoDBActionITCaseBase extends 
CdcActionITCaseBase {
             args.addAll(mapToArgs("--mongodb-conf", sourceConfig));
             args.addAll(mapToArgs("--catalog-conf", catalogConfig));
             args.addAll(mapToArgs("--table-conf", tableConfig));
+            args.addAll(listToArgs("--computed-column", computedColumnArgs));
 
             args.addAll(listToArgs("--partition-keys", partitionKeys));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 97257f671..75642b89b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -163,4 +163,31 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
+
+    @Test
+    @Timeout(60)
+    public void testComputedColumn() throws Exception {
+        writeRecordsToMongoDB("test-table-1", database, 
"table/computedcolumn");
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", database);
+        mongodbConfig.put("collection", "test_computed_column");
+
+        MongoDBSyncTableAction action =
+                syncTableActionBuilder(mongodbConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withComputedColumnArgs("_year=year(_date)")
+                        .build();
+        runActionWithDefaultEnv(action);
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(), DataTypes.STRING(), 
DataTypes.INT()
+                        },
+                        new String[] {"_id", "_date", "_year"});
+        waitForResult(
+                Collections.singletonList("+I[100000000000000000000101, 
2023-03-23, 2023]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Collections.singletonList("_id"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/computedcolumn/test-table-1.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/computedcolumn/test-table-1.js
new file mode 100644
index 000000000..48b5398fa
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/computedcolumn/test-table-1.js
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('test_computed_column').insertOne({
+    "_id": ObjectId("100000000000000000000101"),
+    "_date": "2023-03-23"
+});


Reply via email to