This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3883dc3e2 [flink] Mongodb CDC Ingestion support computed column (#1924)
3883dc3e2 is described below
commit 3883dc3e2b8abfb890239fba8e89756fdbdb2c8c
Author: monster <[email protected]>
AuthorDate: Thu Sep 7 14:57:38 2023 +0800
[flink] Mongodb CDC Ingestion support computed column (#1924)
---
docs/content/how-to/cdc-ingestion.md | 2 +
docs/content/maintenance/manage-tags.md | 1 +
.../shortcodes/generated/mongodb_operator.html | 4 +-
.../shortcodes/generated/mongodb_path_example.html | 4 +-
.../generated/mongodb_sync_database.html | 2 +-
.../shortcodes/generated/mongodb_sync_table.html | 6 +-
.../action/cdc/mongodb/MongoDBActionUtils.java | 4 +-
.../action/cdc/mongodb/MongoDBRecordParser.java | 20 +++--
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 94 ++++++++++++++++++----
.../cdc/mongodb/MongoDBSyncTableActionFactory.java | 45 +++++------
.../mongodb/strategy/Mongo4VersionStrategy.java | 18 ++++-
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 72 +++++++++++------
.../cdc/mongodb/MongoDBActionITCaseBase.java | 14 +---
.../cdc/mongodb/MongoDBSyncTableActionITCase.java | 27 +++++++
.../mongodb/table/computedcolumn/test-table-1.js | 19 +++++
15 files changed, 241 insertions(+), 91 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index ea3a33fc1..217870728 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -447,6 +447,7 @@ To use this feature through `flink run`, run the following
shell command.
--database <database-name> \
--table <table-name> \
[--partition-keys <partition-keys>] \
+ [--computed-column <'column-name=expr-name(args[, ...])'>
[--computed-column ...]] \
[--mongodb-conf <mongodb-cdc-source-conf> [--mongodb-conf
<mongodb-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf
<paimon-table-sink-conf> ...]]
@@ -535,6 +536,7 @@ Example 1: synchronize collection into one Paimon table
--database test_db \
--table test_table \
--partition-keys pt \
+ --computed-column '_year=year(age)' \
--mongodb-conf hosts=127.0.0.1:27017 \
--mongodb-conf username=root \
--mongodb-conf password=123456 \
diff --git a/docs/content/maintenance/manage-tags.md
b/docs/content/maintenance/manage-tags.md
index 1bb97535a..cfd9422ca 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -176,6 +176,7 @@ public class DeleteTag {
{{< /tab >}}
+
{{< tab "Spark" >}}
Run the following sql:
```sql
diff --git a/docs/layouts/shortcodes/generated/mongodb_operator.html
b/docs/layouts/shortcodes/generated/mongodb_operator.html
index 2f4f99197..aa62d2f32 100644
--- a/docs/layouts/shortcodes/generated/mongodb_operator.html
+++ b/docs/layouts/shortcodes/generated/mongodb_operator.html
@@ -20,8 +20,8 @@ under the License.
<table class="configuration table table-bordered">
<thead>
<tr>
- <th class="text-left" style="width: 15%">Operator</th>
- <th class="text-left" style="width: 85%">Description</th>
+ <th class="text-left" style="width: 20%">Operator</th>
+ <th class="text-left" style="width: 80%">Description</th>
</tr>
</thead>
<tbody>
diff --git a/docs/layouts/shortcodes/generated/mongodb_path_example.html
b/docs/layouts/shortcodes/generated/mongodb_path_example.html
index fb1c6a874..6f5922578 100644
--- a/docs/layouts/shortcodes/generated/mongodb_path_example.html
+++ b/docs/layouts/shortcodes/generated/mongodb_path_example.html
@@ -20,8 +20,8 @@ under the License.
<table class="configuration table table-bordered">
<thead>
<tr>
- <th class="text-left" style="width: 15%">JsonPath</th>
- <th class="text-left" style="width: 85%">Result</th>
+ <th class="text-left" style="width: 45%">JsonPath</th>
+ <th class="text-left" style="width: 55%">Result</th>
</tr>
</thead>
<tbody>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
index a7c9ae94f..235912def 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -51,7 +51,7 @@ under the License.
</tr>
<tr>
<td><h5>--mongodb-conf</h5></td>
- <td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
+ <td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hosts, username,
password, database are required configurations, others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_table.html
b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
index 440580eb7..384bd6e35 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_table.html
@@ -41,9 +41,13 @@ under the License.
<td><h5>--partition-keys</h5></td>
<td>The partition keys for Paimon table. If there are multiple
partition keys, connect them with comma, for example "dt,hh,mm".</td>
</tr>
+ <tr>
+ <td><h5>--computed-column</h5></td>
+ <td>The definitions of computed columns. The argument field is from
MongoDB collection field name. See <a href="#computed-functions">here</a> for a
complete list of configurations. </td>
+ </tr>
<tr>
<td><h5>--mongodb-conf</h5></td>
- <td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
+ <td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hosts, username,
password, database and collection are required configurations, others are
optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index 4b18bc832..fefc4a687 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
@@ -161,6 +162,7 @@ public class MongoDBActionUtils {
static Schema buildPaimonSchema(
MongodbSchema mongodbSchema,
List<String> specifiedPartitionKeys,
+ List<ComputedColumn> computedColumns,
Map<String, String> tableConfig,
boolean caseSensitive) {
LinkedHashMap<String, DataType> sourceColumns =
@@ -172,7 +174,7 @@ public class MongoDBActionUtils {
return CdcActionCommonUtils.buildPaimonSchema(
specifiedPartitionKeys,
Lists.newArrayList(PRIMARY_KEY),
- Collections.emptyList(),
+ computedColumns,
tableConfig,
sourceColumns,
null,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index a8aad81af..3f2c33f8e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
import
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
@@ -30,6 +31,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
+import java.util.Collections;
+import java.util.List;
+
/**
* A parser for MongoDB Debezium JSON strings, converting them into a list of
{@link
* RichCdcMultiplexRecord}s.
@@ -55,22 +59,27 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
private static final String FIELD_TABLE = "coll";
private static final String FIELD_NAMESPACE = "ns";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
+ private final List<ComputedColumn> computedColumns;
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final Configuration mongodbConfig;
private JsonNode root;
- public MongoDBRecordParser(boolean caseSensitive, Configuration
mongodbConfig) {
- this(caseSensitive, new TableNameConverter(caseSensitive),
mongodbConfig);
+ public MongoDBRecordParser(
+ boolean caseSensitive,
+ TableNameConverter tableNameConverter,
+ Configuration mongodbConfig) {
+ this(caseSensitive, tableNameConverter, Collections.emptyList(),
mongodbConfig);
}
public MongoDBRecordParser(
boolean caseSensitive,
TableNameConverter tableNameConverter,
+ List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
this.caseSensitive = caseSensitive;
this.tableNameConverter = tableNameConverter;
+ this.computedColumns = computedColumns;
this.mongodbConfig = mongodbConfig;
}
@@ -81,7 +90,7 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
String collection =
tableNameConverter.convert(extractString(FIELD_TABLE));
MongoVersionStrategy versionStrategy =
VersionStrategyFactory.create(
- databaseName, collection, caseSensitive,
mongodbConfig);
+ databaseName, collection, caseSensitive,
computedColumns, mongodbConfig);
versionStrategy.extractRecords(root).forEach(out::collect);
}
@@ -94,13 +103,14 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
String databaseName,
String collection,
boolean caseSensitive,
+ List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
// TODO: When MongoDB CDC is upgraded to 2.5, uncomment the
version check logic
// if (mongodbVersion >= 6) {
// return new Mongo6VersionStrategy(databaseName, collection,
caseSensitive);
// }
return new Mongo4VersionStrategy(
- databaseName, collection, caseSensitive, mongodbConfig);
+ databaseName, collection, caseSensitive, computedColumns,
mongodbConfig);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 26940ac83..336fb0fc1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -35,9 +37,13 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -66,21 +72,68 @@ public class MongoDBSyncTableAction extends ActionBase {
private final String table;
private final List<String> partitionKeys;
private final Map<String, String> tableConfig;
+ private final List<String> computedColumnArgs;
+
+ private MongoDBSyncTableAction(Builder builder) {
+ super(builder.warehouse, builder.catalogConfig);
+ this.mongodbConfig = builder.mongodbConfig;
+ this.database = builder.database;
+ this.table = builder.table;
+ this.partitionKeys = builder.partitionKeys;
+ this.tableConfig = builder.tableConfig;
+ this.computedColumnArgs = builder.computedColumnArgs;
+ }
+
+ /**
+ * Builder class for constructing MongoDBSyncTableAction instances. This
class follows the
+ * Builder design pattern, allowing for a more readable and maintainable
way to set up complex
+ * objects.
+ */
+ public static class Builder {
+ private final String warehouse;
+ private final String database;
+ private final String table;
+ private final Configuration mongodbConfig;
+ private final Map<String, String> catalogConfig;
+ private List<String> partitionKeys = new ArrayList<>();
+ private Map<String, String> tableConfig = new HashMap<>();
+ private List<String> computedColumnArgs = new ArrayList<>();
+
+ public Builder(
+ String warehouse,
+ String database,
+ String table,
+ Map<String, String> catalogConfig,
+ Map<String, String> mongodbConfig) {
+ this.warehouse = warehouse;
+ this.database = database;
+ this.table = table;
+ this.catalogConfig = catalogConfig;
+ this.mongodbConfig = Configuration.fromMap(mongodbConfig);
+ }
+
+ public Builder withPartitionKeys(List<String> partitionKeys) {
+ this.partitionKeys = partitionKeys;
+ return this;
+ }
+
+ public Builder withPartitionKeys(String... partitionKeys) {
+ return withPartitionKeys(Arrays.asList(partitionKeys));
+ }
+
+ public Builder withTableConfig(Map<String, String> tableConfig) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
- public MongoDBSyncTableAction(
- Map<String, String> mongodbConfig,
- String warehouse,
- String database,
- String table,
- List<String> partitionKeys,
- Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
- super(warehouse, catalogConfig);
- this.mongodbConfig = Configuration.fromMap(mongodbConfig);
- this.database = database;
- this.table = table;
- this.partitionKeys = partitionKeys;
- this.tableConfig = tableConfig;
+ public Builder withComputedColumnArgs(List<String> computedColumnArgs)
{
+ this.computedColumnArgs = computedColumnArgs;
+ return this;
+ }
+
+ public MongoDBSyncTableAction buildAction() {
+ return new MongoDBSyncTableAction(this);
+ }
}
@Override
@@ -106,6 +159,8 @@ public class MongoDBSyncTableAction extends ActionBase {
MongodbSchema mongodbSchema =
MongodbSchema.getMongodbSchema(mongodbConfig);
catalog.createDatabase(database, true);
+ List<ComputedColumn> computedColumns =
+ buildComputedColumns(computedColumnArgs,
mongodbSchema.fields());
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
@@ -116,7 +171,11 @@ public class MongoDBSyncTableAction extends ActionBase {
} else {
Schema fromMongodb =
MongoDBActionUtils.buildPaimonSchema(
- mongodbSchema, partitionKeys, tableConfig,
caseSensitive);
+ mongodbSchema,
+ partitionKeys,
+ computedColumns,
+ tableConfig,
+ caseSensitive);
catalog.createTable(identifier, fromMongodb, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
@@ -133,7 +192,10 @@ public class MongoDBSyncTableAction extends ActionBase {
"MongoDB Source")
.flatMap(
new MongoDBRecordParser(
- caseSensitive,
mongodbConfig)))
+ caseSensitive,
+ new
TableNameConverter(caseSensitive),
+ computedColumns,
+ mongodbConfig)))
.withParserFactory(parserFactory)
.withTable(table)
.withIdentifier(identifier)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
index 96249895a..b9612a7df 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -24,12 +24,8 @@ import org.apache.paimon.flink.action.ActionFactory;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
import java.util.Optional;
-import java.util.stream.Collectors;
/** Factory to create {@link MongoDBSyncTableAction}. */
public class MongoDBSyncTableActionFactory implements ActionFactory {
@@ -44,29 +40,26 @@ public class MongoDBSyncTableActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterTool params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
+ checkRequiredArgument(params, "mongodb-conf");
+
+ MongoDBSyncTableAction.Builder builder =
+ new MongoDBSyncTableAction.Builder(
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "mongodb-conf"))
+ .withTableConfig(optionalConfigMap(params,
"table-conf"));
- List<String> partitionKeys = Collections.emptyList();
if (params.has("partition-keys")) {
- partitionKeys =
- Arrays.stream(params.get("partition-keys").split(","))
- .collect(Collectors.toList());
+ builder.withPartitionKeys(params.get("partition-keys").split(","));
}
- checkRequiredArgument(params, "mongodb-conf");
-
- Map<String, String> mongodbConfig = optionalConfigMap(params,
"mongodb-conf");
- Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
- Map<String, String> tableConfig = optionalConfigMap(params,
"table-conf");
-
- return Optional.of(
- new MongoDBSyncTableAction(
- mongodbConfig,
- tablePath.f0,
- tablePath.f1,
- tablePath.f2,
- partitionKeys,
- catalogConfig,
- tableConfig));
+ if (params.has("computed-column")) {
+ builder.withComputedColumnArgs(
+ new
ArrayList<>(params.getMultiParameter("computed-column")));
+ }
+ return Optional.ofNullable(builder.buildAction());
}
@Override
@@ -81,6 +74,7 @@ public class MongoDBSyncTableActionFactory implements
ActionFactory {
" mongodb-sync-table --warehouse <warehouse-path> --database
<database-name> "
+ "--table <table-name> "
+ "[--partition-keys <partition-keys>] "
+ + "[--computed-column <'column-name=expr-name(args[,
...])'> [--computed-column ...]] "
+ "[--mongodb-conf <mongodb-cdc-source-conf>
[--mongodb-conf <mongodb-cdc-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]] "
+ "[--table-conf <paimon-table-sink-conf>
[--table-conf <paimon-table-sink-conf> ...]]");
@@ -93,6 +87,9 @@ public class MongoDBSyncTableActionFactory implements
ActionFactory {
+ "this action will automatically create an
unpartitioned Paimon table.");
System.out.println();
+ System.out.println("Please see doc for usage of --computed-column.");
+ System.out.println();
+
System.out.println("mongodb CDC source conf syntax:");
System.out.println(" key=value");
System.out.println(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 22887109a..028af46f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb.strategy;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
@@ -49,16 +50,19 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
private final String collection;
private final boolean caseSensitive;
private final Configuration mongodbConfig;
+ private final List<ComputedColumn> computedColumns;
public Mongo4VersionStrategy(
String databaseName,
String collection,
boolean caseSensitive,
+ List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
this.databaseName = databaseName;
this.collection = collection;
this.caseSensitive = caseSensitive;
this.mongodbConfig = mongodbConfig;
+ this.computedColumns = computedColumns;
}
/**
@@ -115,7 +119,12 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
JsonNode fullDocument, LinkedHashMap<String, DataType>
paimonFieldTypes)
throws JsonProcessingException {
Map<String, String> insert =
- getExtractRow(fullDocument, paimonFieldTypes, caseSensitive,
mongodbConfig);
+ getExtractRow(
+ fullDocument,
+ paimonFieldTypes,
+ caseSensitive,
+ computedColumns,
+ mongodbConfig);
return new RichCdcMultiplexRecord(
databaseName,
collection,
@@ -136,7 +145,12 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
JsonNode fullDocument, LinkedHashMap<String, DataType>
paimonFieldTypes)
throws JsonProcessingException {
Map<String, String> after =
- getExtractRow(fullDocument, paimonFieldTypes, caseSensitive,
mongodbConfig);
+ getExtractRow(
+ fullDocument,
+ paimonFieldTypes,
+ caseSensitive,
+ computedColumns,
+ mongodbConfig);
return new RichCdcMultiplexRecord(
databaseName,
collection,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 8582ec348..1aa048fd1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb.strategy;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
@@ -86,6 +87,7 @@ public interface MongoVersionStrategy {
JsonNode jsonNode,
LinkedHashMap<String, DataType> paimonFieldTypes,
boolean caseSensitive,
+ List<ComputedColumn> computedColumns,
Configuration mongodbConfig)
throws JsonProcessingException {
SchemaAcquisitionMode mode =
@@ -100,10 +102,16 @@ public interface MongoVersionStrategy {
document.toString(),
mongodbConfig.getString(PARSER_PATH),
mongodbConfig.getString(FIELD_NAME),
+ computedColumns,
paimonFieldTypes);
break;
case DYNAMIC:
- row = parseAndTypeJsonRow(document.toString(),
paimonFieldTypes, caseSensitive);
+ row =
+ parseAndTypeJsonRow(
+ document.toString(),
+ paimonFieldTypes,
+ computedColumns,
+ caseSensitive);
break;
default:
throw new RuntimeException("Unsupported extraction mode: " +
mode);
@@ -111,47 +119,59 @@ public interface MongoVersionStrategy {
return mapKeyCaseConvert(row, caseSensitive,
recordKeyDuplicateErrMsg(row));
}
- /**
- * Parses a JSON string into a map and updates the data type mapping for
each key.
- *
- * @param evaluate The JSON string to be parsed.
- * @param paimonFieldTypes A map to store the data types of the keys.
- * @return A map containing the parsed key-value pairs from the JSON
string.
- */
+ /** Parses and types a JSON row based on the given parameters. */
default Map<String, String> parseAndTypeJsonRow(
String evaluate,
LinkedHashMap<String, DataType> paimonFieldTypes,
+ List<ComputedColumn> computedColumns,
boolean caseSensitive) {
- Map<String, String> parsedMap = JsonSerdeUtil.parseJsonMap(evaluate,
String.class);
- for (String column : parsedMap.keySet()) {
- paimonFieldTypes.put(caseSensitive ? column :
column.toLowerCase(), DataTypes.STRING());
- }
- return extractRow(evaluate);
+ Map<String, String> parsedRow = JsonSerdeUtil.parseJsonMap(evaluate,
String.class);
+ return processParsedData(parsedRow, paimonFieldTypes, computedColumns,
caseSensitive);
}
- /**
- * Parses specified fields from a JSON record.
- *
- * @param record The JSON record to be parsed.
- * @param fieldPaths The paths of the fields to be parsed from the JSON
record.
- * @param fieldNames The names of the fields to be returned in the result
map.
- * @param paimonFieldTypes A map to store the data types of the fields.
- * @return A map containing the parsed fields and their values.
- */
+ /** Parses fields from a JSON record based on the given parameters. */
static Map<String, String> parseFieldsFromJsonRecord(
String record,
String fieldPaths,
String fieldNames,
- LinkedHashMap<String, DataType> paimonFieldTypes) {
- Map<String, String> resultMap = new HashMap<>();
+ List<ComputedColumn> computedColumns,
+ LinkedHashMap<String, DataType> fieldTypes) {
String[] columnNames = fieldNames.split(",");
String[] parseNames = fieldPaths.split(",");
+ Map<String, String> parsedRow = new HashMap<>();
for (int i = 0; i < parseNames.length; i++) {
- paimonFieldTypes.put(columnNames[i], DataTypes.STRING());
String evaluate = JsonPath.read(record, parseNames[i]);
- resultMap.put(columnNames[i],
Optional.ofNullable(evaluate).orElse("{}"));
+ parsedRow.put(columnNames[i],
Optional.ofNullable(evaluate).orElse("{}"));
}
+
+ return processParsedData(parsedRow, fieldTypes, computedColumns,
false);
+ }
+
+ /** Processes the parsed data to generate the result map and update field
types. */
+ static Map<String, String> processParsedData(
+ Map<String, String> parsedRow,
+ LinkedHashMap<String, DataType> fieldTypes,
+ List<ComputedColumn> computedColumns,
+ boolean caseSensitive) {
+ int initialCapacity = parsedRow.size() + computedColumns.size();
+ Map<String, String> resultMap = new HashMap<>(initialCapacity);
+
+ parsedRow.forEach(
+ (column, value) -> {
+ String key = caseSensitive ? column : column.toLowerCase();
+ fieldTypes.putIfAbsent(key, DataTypes.STRING());
+ resultMap.put(key, value);
+ });
+ computedColumns.forEach(
+ computedColumn -> {
+ String columnName = computedColumn.columnName();
+ String fieldReference = computedColumn.fieldReference();
+ String computedValue =
computedColumn.eval(parsedRow.get(fieldReference));
+
+ resultMap.put(columnName, computedValue);
+ fieldTypes.put(columnName, computedColumn.columnType());
+ });
return resultMap;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index 8976d6aa4..53e8d11eb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -72,8 +72,8 @@ public abstract class MongoDBActionITCaseBase extends
CdcActionITCaseBase {
return
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, content);
}
- protected static String writeRecordsToMongoDB(String fileName, String
dbName, String content) {
- return
MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName, dbName,
content);
+ protected static void writeRecordsToMongoDB(String fileName, String
dbName, String content) {
+ MONGODB_CONTAINER.executeCommandFileInSeparateDatabase(fileName,
dbName, content);
}
protected MongoDBSyncTableActionBuilder syncTableActionBuilder(
@@ -98,15 +98,6 @@ public abstract class MongoDBActionITCaseBase extends
CdcActionITCaseBase {
throw new UnsupportedOperationException();
}
- public MongoDBSyncTableActionBuilder withComputedColumnArgs(String...
computedColumnArgs) {
- throw new UnsupportedOperationException();
- }
-
- public MongoDBSyncTableActionBuilder withComputedColumnArgs(
- List<String> computedColumnArgs) {
- throw new UnsupportedOperationException();
- }
-
public MongoDBSyncTableActionBuilder withTypeMappingModes(String...
typeMappingModes) {
throw new UnsupportedOperationException();
}
@@ -125,6 +116,7 @@ public abstract class MongoDBActionITCaseBase extends
CdcActionITCaseBase {
args.addAll(mapToArgs("--mongodb-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
args.addAll(mapToArgs("--table-conf", tableConfig));
+ args.addAll(listToArgs("--computed-column", computedColumnArgs));
args.addAll(listToArgs("--partition-keys", partitionKeys));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 97257f671..75642b89b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -163,4 +163,31 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
+
+ @Test
+ @Timeout(60)
+ public void testComputedColumn() throws Exception {
+ writeRecordsToMongoDB("test-table-1", database,
"table/computedcolumn");
+ Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+ mongodbConfig.put("database", database);
+ mongodbConfig.put("collection", "test_computed_column");
+
+ MongoDBSyncTableAction action =
+ syncTableActionBuilder(mongodbConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withComputedColumnArgs("_year=year(_date)")
+ .build();
+ runActionWithDefaultEnv(action);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(), DataTypes.STRING(),
DataTypes.INT()
+ },
+ new String[] {"_id", "_date", "_year"});
+ waitForResult(
+ Collections.singletonList("+I[100000000000000000000101,
2023-03-23, 2023]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.singletonList("_id"));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/computedcolumn/test-table-1.js
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/computedcolumn/test-table-1.js
new file mode 100644
index 000000000..48b5398fa
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/table/computedcolumn/test-table-1.js
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('test_computed_column').insertOne({
+ "_id": ObjectId("100000000000000000000101"),
+ "_date": "2023-03-23"
+});