This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 84b653d18 [flink] Optimize mongodb action build mode (#1990)
84b653d18 is described below
commit 84b653d18bfb60d808317627b27c38ab6fa0ac22
Author: monster <[email protected]>
AuthorDate: Wed Sep 13 16:05:52 2023 +0800
[flink] Optimize mongodb action build mode (#1990)
---
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 63 ++++++++++-----
.../mongodb/MongoDBSyncDatabaseActionFactory.java | 37 ++++-----
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 89 +++++++---------------
.../cdc/mongodb/MongoDBSyncTableActionFactory.java | 11 +--
4 files changed, 90 insertions(+), 110 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index be8a9939b..f6043824c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -39,6 +39,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@@ -66,34 +67,53 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class MongoDBSyncDatabaseAction extends ActionBase {
- private final Configuration mongodbConfig;
private final String database;
- private final String tablePrefix;
- private final String tableSuffix;
- private final Map<String, String> tableConfig;
- @Nullable private final Pattern includingPattern;
- @Nullable private final Pattern excludingPattern;
- private final String includingTables;
+ private final Configuration mongodbConfig;
+ private Map<String, String> tableConfig = new HashMap<>();
+ private String tablePrefix = "";
+ private String tableSuffix = "";
+ private String includingTables = ".*";
+ @Nullable String excludingTables;
public MongoDBSyncDatabaseAction(
- Map<String, String> mongodbConfig,
String warehouse,
String database,
- @Nullable String tablePrefix,
- @Nullable String tableSuffix,
- @Nullable String includingTables,
- @Nullable String excludingTables,
Map<String, String> catalogConfig,
- Map<String, String> tableConfig) {
+ Map<String, String> mongodbConfig) {
super(warehouse, catalogConfig);
- this.mongodbConfig = Configuration.fromMap(mongodbConfig);
this.database = database;
- this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
- this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
- this.includingTables = includingTables == null ? ".*" :
includingTables;
- this.includingPattern = Pattern.compile(this.includingTables);
- this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
+ this.mongodbConfig = Configuration.fromMap(mongodbConfig);
+ }
+
+ public MongoDBSyncDatabaseAction withTableConfig(Map<String, String>
tableConfig) {
this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public MongoDBSyncDatabaseAction withTablePrefix(@Nullable String
tablePrefix) {
+ if (tablePrefix != null) {
+ this.tablePrefix = tablePrefix;
+ }
+ return this;
+ }
+
+ public MongoDBSyncDatabaseAction withTableSuffix(@Nullable String
tableSuffix) {
+ if (tableSuffix != null) {
+ this.tableSuffix = tableSuffix;
+ }
+ return this;
+ }
+
+ public MongoDBSyncDatabaseAction includingTables(@Nullable String
includingTables) {
+ if (includingTables != null) {
+ this.includingTables = includingTables;
+ }
+ return this;
+ }
+
+ public MongoDBSyncDatabaseAction excludingTables(@Nullable String
excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
}
@Override
@@ -120,8 +140,9 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
new RichCdcMultiplexRecordSchemaBuilder(tableConfig,
caseSensitive);
- Pattern includingPattern = this.includingPattern;
- Pattern excludingPattern = this.excludingPattern;
+ Pattern includingPattern = Pattern.compile(this.includingTables);
+ Pattern excludingPattern =
+ excludingTables == null ? null :
Pattern.compile(excludingTables);
parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
index 2fb2e0bbc..2232807fa 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.action.ActionFactory;
import org.apache.flink.api.java.utils.MultipleParameterTool;
-import java.util.Map;
import java.util.Optional;
/** Factory to create {@link MongoDBSyncDatabaseAction}. */
@@ -36,33 +35,23 @@ public class MongoDBSyncDatabaseActionFactory implements
ActionFactory {
return IDENTIFIER;
}
- @Override
public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "warehouse");
- checkRequiredArgument(params, "database");
checkRequiredArgument(params, "mongodb-conf");
- String warehouse = params.get("warehouse");
- String database = params.get("database");
- String tablePrefix = params.get("table-prefix");
- String tableSuffix = params.get("table-suffix");
- String includingTables = params.get("including-tables");
- String excludingTables = params.get("excluding-tables");
-
- Map<String, String> mongodbConfigOption = optionalConfigMap(params,
"mongodb-conf");
- Map<String, String> catalogConfigOption = optionalConfigMap(params,
"catalog-conf");
- Map<String, String> tableConfigOption = optionalConfigMap(params,
"table-conf");
- return Optional.of(
+ MongoDBSyncDatabaseAction action =
new MongoDBSyncDatabaseAction(
- mongodbConfigOption,
- warehouse,
- database,
- tablePrefix,
- tableSuffix,
- includingTables,
- excludingTables,
- catalogConfigOption,
- tableConfigOption));
+ getRequiredValue(params, "warehouse"),
+ getRequiredValue(params, "database"),
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "mongodb-conf"));
+
+ action.withTableConfig(optionalConfigMap(params, "table-conf"))
+ .withTablePrefix(params.get("table-prefix"))
+ .withTableSuffix(params.get("table-suffix"))
+ .includingTables(params.get("including-tables"))
+ .excludingTables(params.get("excluding-tables"));
+
+ return Optional.of(action);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 336fb0fc1..ed868a0ec 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -67,73 +67,42 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class MongoDBSyncTableAction extends ActionBase {
- private final Configuration mongodbConfig;
private final String database;
private final String table;
- private final List<String> partitionKeys;
- private final Map<String, String> tableConfig;
- private final List<String> computedColumnArgs;
-
- private MongoDBSyncTableAction(Builder builder) {
- super(builder.warehouse, builder.catalogConfig);
- this.mongodbConfig = builder.mongodbConfig;
- this.database = builder.database;
- this.table = builder.table;
- this.partitionKeys = builder.partitionKeys;
- this.tableConfig = builder.tableConfig;
- this.computedColumnArgs = builder.computedColumnArgs;
+ private final Configuration mongodbConfig;
+ private List<String> partitionKeys = new ArrayList<>();
+ private Map<String, String> tableConfig = new HashMap<>();
+ private List<String> computedColumnArgs = new ArrayList<>();
+
+ public MongoDBSyncTableAction(
+ String warehouse,
+ String database,
+ String table,
+ Map<String, String> catalogConfig,
+ Map<String, String> mongodbConfig) {
+ super(warehouse, catalogConfig);
+ this.database = database;
+ this.table = table;
+ this.mongodbConfig = Configuration.fromMap(mongodbConfig);
}
- /**
- * Builder class for constructing MongoDBSyncTableAction instances. This
class follows the
- * Builder design pattern, allowing for a more readable and maintainable
way to set up complex
- * objects.
- */
- public static class Builder {
- private final String warehouse;
- private final String database;
- private final String table;
- private final Configuration mongodbConfig;
- private final Map<String, String> catalogConfig;
- private List<String> partitionKeys = new ArrayList<>();
- private Map<String, String> tableConfig = new HashMap<>();
- private List<String> computedColumnArgs = new ArrayList<>();
-
- public Builder(
- String warehouse,
- String database,
- String table,
- Map<String, String> catalogConfig,
- Map<String, String> mongodbConfig) {
- this.warehouse = warehouse;
- this.database = database;
- this.table = table;
- this.catalogConfig = catalogConfig;
- this.mongodbConfig = Configuration.fromMap(mongodbConfig);
- }
-
- public Builder withPartitionKeys(List<String> partitionKeys) {
- this.partitionKeys = partitionKeys;
- return this;
- }
-
- public Builder withPartitionKeys(String... partitionKeys) {
- return withPartitionKeys(Arrays.asList(partitionKeys));
- }
+ public MongoDBSyncTableAction withPartitionKeys(List<String>
partitionKeys) {
+ this.partitionKeys = partitionKeys;
+ return this;
+ }
- public Builder withTableConfig(Map<String, String> tableConfig) {
- this.tableConfig = tableConfig;
- return this;
- }
+ public MongoDBSyncTableAction withPartitionKeys(String... partitionKeys) {
+ return withPartitionKeys(Arrays.asList(partitionKeys));
+ }
- public Builder withComputedColumnArgs(List<String> computedColumnArgs)
{
- this.computedColumnArgs = computedColumnArgs;
- return this;
- }
+ public MongoDBSyncTableAction withTableConfig(Map<String, String>
tableConfig) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
- public MongoDBSyncTableAction buildAction() {
- return new MongoDBSyncTableAction(this);
- }
+ public MongoDBSyncTableAction withComputedColumnArgs(List<String>
computedColumnArgs) {
+ this.computedColumnArgs = computedColumnArgs;
+ return this;
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
index b9612a7df..ed72b9812 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -42,8 +42,8 @@ public class MongoDBSyncTableActionFactory implements
ActionFactory {
Tuple3<String, String, String> tablePath = getTablePath(params);
checkRequiredArgument(params, "mongodb-conf");
- MongoDBSyncTableAction.Builder builder =
- new MongoDBSyncTableAction.Builder(
+ MongoDBSyncTableAction action =
+ new MongoDBSyncTableAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
@@ -52,14 +52,15 @@ public class MongoDBSyncTableActionFactory implements
ActionFactory {
.withTableConfig(optionalConfigMap(params,
"table-conf"));
if (params.has("partition-keys")) {
- builder.withPartitionKeys(params.get("partition-keys").split(","));
+ action.withPartitionKeys(params.get("partition-keys").split(","));
}
if (params.has("computed-column")) {
- builder.withComputedColumnArgs(
+ action.withComputedColumnArgs(
new
ArrayList<>(params.getMultiParameter("computed-column")));
}
- return Optional.ofNullable(builder.buildAction());
+
+ return Optional.of(action);
}
@Override