This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b653d18 [flink] Optimize mongodb action build mode (#1990)
84b653d18 is described below

commit 84b653d18bfb60d808317627b27c38ab6fa0ac22
Author: monster <[email protected]>
AuthorDate: Wed Sep 13 16:05:52 2023 +0800

    [flink] Optimize mongodb action build mode (#1990)
---
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     | 63 ++++++++++-----
 .../mongodb/MongoDBSyncDatabaseActionFactory.java  | 37 ++++-----
 .../action/cdc/mongodb/MongoDBSyncTableAction.java | 89 +++++++---------------
 .../cdc/mongodb/MongoDBSyncTableActionFactory.java | 11 +--
 4 files changed, 90 insertions(+), 110 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index be8a9939b..f6043824c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -66,34 +67,53 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  */
 public class MongoDBSyncDatabaseAction extends ActionBase {
 
-    private final Configuration mongodbConfig;
     private final String database;
-    private final String tablePrefix;
-    private final String tableSuffix;
-    private final Map<String, String> tableConfig;
-    @Nullable private final Pattern includingPattern;
-    @Nullable private final Pattern excludingPattern;
-    private final String includingTables;
+    private final Configuration mongodbConfig;
+    private Map<String, String> tableConfig = new HashMap<>();
+    private String tablePrefix = "";
+    private String tableSuffix = "";
+    private String includingTables = ".*";
+    @Nullable String excludingTables;
 
     public MongoDBSyncDatabaseAction(
-            Map<String, String> mongodbConfig,
             String warehouse,
             String database,
-            @Nullable String tablePrefix,
-            @Nullable String tableSuffix,
-            @Nullable String includingTables,
-            @Nullable String excludingTables,
             Map<String, String> catalogConfig,
-            Map<String, String> tableConfig) {
+            Map<String, String> mongodbConfig) {
         super(warehouse, catalogConfig);
-        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
         this.database = database;
-        this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
-        this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
-        this.includingTables = includingTables == null ? ".*" : 
includingTables;
-        this.includingPattern = Pattern.compile(this.includingTables);
-        this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
+    }
+
+    public MongoDBSyncDatabaseAction withTableConfig(Map<String, String> 
tableConfig) {
         this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public MongoDBSyncDatabaseAction withTablePrefix(@Nullable String 
tablePrefix) {
+        if (tablePrefix != null) {
+            this.tablePrefix = tablePrefix;
+        }
+        return this;
+    }
+
+    public MongoDBSyncDatabaseAction withTableSuffix(@Nullable String 
tableSuffix) {
+        if (tableSuffix != null) {
+            this.tableSuffix = tableSuffix;
+        }
+        return this;
+    }
+
+    public MongoDBSyncDatabaseAction includingTables(@Nullable String 
includingTables) {
+        if (includingTables != null) {
+            this.includingTables = includingTables;
+        }
+        return this;
+    }
+
+    public MongoDBSyncDatabaseAction excludingTables(@Nullable String 
excludingTables) {
+        this.excludingTables = excludingTables;
+        return this;
     }
 
     @Override
@@ -120,8 +140,9 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
         RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
                 new RichCdcMultiplexRecordSchemaBuilder(tableConfig, 
caseSensitive);
-        Pattern includingPattern = this.includingPattern;
-        Pattern excludingPattern = this.excludingPattern;
+        Pattern includingPattern = Pattern.compile(this.includingTables);
+        Pattern excludingPattern =
+                excludingTables == null ? null : 
Pattern.compile(excludingTables);
         parserFactory =
                 () ->
                         new RichCdcMultiplexRecordEventParser(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
index 2fb2e0bbc..2232807fa 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.action.ActionFactory;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
-import java.util.Map;
 import java.util.Optional;
 
 /** Factory to create {@link MongoDBSyncDatabaseAction}. */
@@ -36,33 +35,23 @@ public class MongoDBSyncDatabaseActionFactory implements 
ActionFactory {
         return IDENTIFIER;
     }
 
-    @Override
     public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "warehouse");
-        checkRequiredArgument(params, "database");
         checkRequiredArgument(params, "mongodb-conf");
 
-        String warehouse = params.get("warehouse");
-        String database = params.get("database");
-        String tablePrefix = params.get("table-prefix");
-        String tableSuffix = params.get("table-suffix");
-        String includingTables = params.get("including-tables");
-        String excludingTables = params.get("excluding-tables");
-
-        Map<String, String> mongodbConfigOption = optionalConfigMap(params, 
"mongodb-conf");
-        Map<String, String> catalogConfigOption = optionalConfigMap(params, 
"catalog-conf");
-        Map<String, String> tableConfigOption = optionalConfigMap(params, 
"table-conf");
-        return Optional.of(
+        MongoDBSyncDatabaseAction action =
                 new MongoDBSyncDatabaseAction(
-                        mongodbConfigOption,
-                        warehouse,
-                        database,
-                        tablePrefix,
-                        tableSuffix,
-                        includingTables,
-                        excludingTables,
-                        catalogConfigOption,
-                        tableConfigOption));
+                        getRequiredValue(params, "warehouse"),
+                        getRequiredValue(params, "database"),
+                        optionalConfigMap(params, "catalog-conf"),
+                        optionalConfigMap(params, "mongodb-conf"));
+
+        action.withTableConfig(optionalConfigMap(params, "table-conf"))
+                .withTablePrefix(params.get("table-prefix"))
+                .withTableSuffix(params.get("table-suffix"))
+                .includingTables(params.get("including-tables"))
+                .excludingTables(params.get("excluding-tables"));
+
+        return Optional.of(action);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 336fb0fc1..ed868a0ec 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -67,73 +67,42 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  */
 public class MongoDBSyncTableAction extends ActionBase {
 
-    private final Configuration mongodbConfig;
     private final String database;
     private final String table;
-    private final List<String> partitionKeys;
-    private final Map<String, String> tableConfig;
-    private final List<String> computedColumnArgs;
-
-    private MongoDBSyncTableAction(Builder builder) {
-        super(builder.warehouse, builder.catalogConfig);
-        this.mongodbConfig = builder.mongodbConfig;
-        this.database = builder.database;
-        this.table = builder.table;
-        this.partitionKeys = builder.partitionKeys;
-        this.tableConfig = builder.tableConfig;
-        this.computedColumnArgs = builder.computedColumnArgs;
+    private final Configuration mongodbConfig;
+    private List<String> partitionKeys = new ArrayList<>();
+    private Map<String, String> tableConfig = new HashMap<>();
+    private List<String> computedColumnArgs = new ArrayList<>();
+
+    public MongoDBSyncTableAction(
+            String warehouse,
+            String database,
+            String table,
+            Map<String, String> catalogConfig,
+            Map<String, String> mongodbConfig) {
+        super(warehouse, catalogConfig);
+        this.database = database;
+        this.table = table;
+        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
     }
 
-    /**
-     * Builder class for constructing MongoDBSyncTableAction instances. This 
class follows the
-     * Builder design pattern, allowing for a more readable and maintainable 
way to set up complex
-     * objects.
-     */
-    public static class Builder {
-        private final String warehouse;
-        private final String database;
-        private final String table;
-        private final Configuration mongodbConfig;
-        private final Map<String, String> catalogConfig;
-        private List<String> partitionKeys = new ArrayList<>();
-        private Map<String, String> tableConfig = new HashMap<>();
-        private List<String> computedColumnArgs = new ArrayList<>();
-
-        public Builder(
-                String warehouse,
-                String database,
-                String table,
-                Map<String, String> catalogConfig,
-                Map<String, String> mongodbConfig) {
-            this.warehouse = warehouse;
-            this.database = database;
-            this.table = table;
-            this.catalogConfig = catalogConfig;
-            this.mongodbConfig = Configuration.fromMap(mongodbConfig);
-        }
-
-        public Builder withPartitionKeys(List<String> partitionKeys) {
-            this.partitionKeys = partitionKeys;
-            return this;
-        }
-
-        public Builder withPartitionKeys(String... partitionKeys) {
-            return withPartitionKeys(Arrays.asList(partitionKeys));
-        }
+    public MongoDBSyncTableAction withPartitionKeys(List<String> 
partitionKeys) {
+        this.partitionKeys = partitionKeys;
+        return this;
+    }
 
-        public Builder withTableConfig(Map<String, String> tableConfig) {
-            this.tableConfig = tableConfig;
-            return this;
-        }
+    public MongoDBSyncTableAction withPartitionKeys(String... partitionKeys) {
+        return withPartitionKeys(Arrays.asList(partitionKeys));
+    }
 
-        public Builder withComputedColumnArgs(List<String> computedColumnArgs) 
{
-            this.computedColumnArgs = computedColumnArgs;
-            return this;
-        }
+    public MongoDBSyncTableAction withTableConfig(Map<String, String> 
tableConfig) {
+        this.tableConfig = tableConfig;
+        return this;
+    }
 
-        public MongoDBSyncTableAction buildAction() {
-            return new MongoDBSyncTableAction(this);
-        }
+    public MongoDBSyncTableAction withComputedColumnArgs(List<String> 
computedColumnArgs) {
+        this.computedColumnArgs = computedColumnArgs;
+        return this;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
index b9612a7df..ed72b9812 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -42,8 +42,8 @@ public class MongoDBSyncTableActionFactory implements 
ActionFactory {
         Tuple3<String, String, String> tablePath = getTablePath(params);
         checkRequiredArgument(params, "mongodb-conf");
 
-        MongoDBSyncTableAction.Builder builder =
-                new MongoDBSyncTableAction.Builder(
+        MongoDBSyncTableAction action =
+                new MongoDBSyncTableAction(
                                 tablePath.f0,
                                 tablePath.f1,
                                 tablePath.f2,
@@ -52,14 +52,15 @@ public class MongoDBSyncTableActionFactory implements 
ActionFactory {
                         .withTableConfig(optionalConfigMap(params, 
"table-conf"));
 
         if (params.has("partition-keys")) {
-            builder.withPartitionKeys(params.get("partition-keys").split(","));
+            action.withPartitionKeys(params.get("partition-keys").split(","));
         }
 
         if (params.has("computed-column")) {
-            builder.withComputedColumnArgs(
+            action.withComputedColumnArgs(
                     new 
ArrayList<>(params.getMultiParameter("computed-column")));
         }
-        return Optional.ofNullable(builder.buildAction());
+
+        return Optional.of(action);
     }
 
     @Override

Reply via email to