This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6416a17a0e [Feature][Engine]Metalake support for data source 
information storage and management (#9688)
6416a17a0e is described below

commit 6416a17a0e3f9e8c13a211a188245ad7b1fc31f6
Author: wtybxqm <[email protected]>
AuthorDate: Wed Sep 24 10:22:17 2025 +0800

    [Feature][Engine]Metalake support for data source information storage and 
management (#9688)
---
 config/seatunnel-env.cmd                           |  11 +-
 config/seatunnel-env.sh                            |   6 +
 docs/en/concept/metalake.md                        |  69 ++++
 docs/sidebars.js                                   |   3 +-
 docs/zh/concept/metalake.md                        |  69 ++++
 seatunnel-api/pom.xml                              |  15 +
 .../seatunnel/api/metalake/GravitinoClient.java    |  66 +++
 .../seatunnel/api/metalake/MetalakeClient.java     |  28 ++
 .../api/metalake/MetalakeClientFactory.java        |  44 ++
 .../api/metalake/MetalakeConfigUtils.java          |  96 +++++
 .../apache/seatunnel/common/utils/JsonUtils.java   |   5 +
 .../seatunnel/common/utils/PlaceholderUtils.java   |  16 +
 seatunnel-connectors-v2/connector-doris/pom.xml    |   2 +-
 .../connector-http/connector-http-base/pom.xml     |   2 +-
 .../connector-selectdb-cloud/pom.xml               |   2 +-
 .../connector-starrocks/pom.xml                    |   2 +-
 .../flink/command/FlinkTaskExecuteCommand.java     |   5 +-
 .../spark/command/SparkTaskExecuteCommand.java     |   5 +-
 .../connectors/seatunnel/jdbc/MetalakeIT.java      | 456 +++++++++++++++++++++
 ..._mysql_source_to_assert_sink_with_metalake.conf | 101 +++++
 .../core/parse/MultipleTableJobConfigParser.java   |   5 +-
 21 files changed, 999 insertions(+), 9 deletions(-)

diff --git a/config/seatunnel-env.cmd b/config/seatunnel-env.cmd
index 79c2d3c117..c5c4a50465 100644
--- a/config/seatunnel-env.cmd
+++ b/config/seatunnel-env.cmd
@@ -18,4 +18,13 @@ REM Home directory of spark distribution.
 if "%SPARK_HOME%" == "" set "SPARK_HOME=C:\Program Files\spark"
 
 REM Home directory of flink distribution.
-if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"
\ No newline at end of file
+if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"
+
+REM Whether to enable metalake (true/false).
+if "%METALAKE_ENABLED%" == "" set "META_LAKE_ENABLED=false"
+
+REM Type of metalake implementation. 
+if "%METALAKE_TYPE%" == "" set "METALAKE_TYPE=gravitino"
+
+REM Metalake service URL, format: 
http://host:port/api/metalakes/{metalake_name}/catalogs/
+if "%METALAKE_URL%" == "" set 
"METALAKE_URL=http://localhost:8090/api/metalakes/default_metalake_name/catalogs/";
\ No newline at end of file
diff --git a/config/seatunnel-env.sh b/config/seatunnel-env.sh
index 1bae8c7625..c655fe4f31 100644
--- a/config/seatunnel-env.sh
+++ b/config/seatunnel-env.sh
@@ -20,3 +20,9 @@
 SPARK_HOME=${SPARK_HOME:-/opt/spark}
 # Home directory of flink distribution.
 FLINK_HOME=${FLINK_HOME:-/opt/flink}
+# Whether to enable metalake (true/false).
+METALAKE_ENABLED=${METALAKE_ENABLED:-false}
+# Type of metalake implementation.
+METALAKE_TYPE=${METALAKE_TYPE:-gravitino}
+# Metalake service URL, format: 
http://host:port/api/metalakes/{metalake_name}/catalogs/.
+METALAKE_URL=${METALAKE_URL:-http://localhost:8090/api/metalakes/default_metalake_name/catalogs/}
diff --git a/docs/en/concept/metalake.md b/docs/en/concept/metalake.md
new file mode 100644
index 0000000000..916d76b451
--- /dev/null
+++ b/docs/en/concept/metalake.md
@@ -0,0 +1,69 @@
+# METALAKE
+
+Since Seatunnel requires database usernames, passwords, and other sensitive 
information to be written in plaintext within scripts when executing tasks, 
this may lead to information leakage and is also difficult to maintain. When 
data source information changes, manual modifications are often required.
+
+To address this, Metalake is introduced. Data source information can be stored 
in Metalake systems such as Apache Gravitino. Task scripts then use `sourceId` 
and placeholders instead of actual usernames and passwords. At runtime, the 
Seatunnel engine retrieves the information from Metalake via HTTP requests and 
replaces the placeholders accordingly.
+
+To enable Metalake, you first need to modify the environment variables in 
**seatunnel-env.sh**:
+
+* `METALAKE_ENABLED`
+* `METALAKE_TYPE`
+* `METALAKE_URL`
+
+Set `METALAKE_ENABLED` to `true`. Currently, `METALAKE_TYPE` only supports 
`gravitino`.
+
+For Apache Gravitino, set `METALAKE_URL` to:
+
+```
+http://host:port/api/metalakes/your_metalake_name/catalogs/
+```
+
+---
+
+## Usage Example
+
+First, create a catalog in Gravitino, for example:
+
+```bash
+curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs' \
+-H 'Content-Type: application/json' \
+-H 'Accept: application/vnd.gravitino.v1+json' \
+-d '{
+    "name": "test_catalog",
+    "type": "relational",
+    "provider": "jdbc-mysql",
+    "comment": "for metalake test",
+    "properties": {
+        "jdbc-driver": "com.mysql.cj.jdbc.Driver",
+        "jdbc-url": "not used",
+        "jdbc-user": "root",
+        "jdbc-password": "Abc!@#135_seatunnel"
+    }
+}'
+```
+
+This creates a `test_catalog` under `test_metalake` (note: `metalake` itself 
must be created in advance).
+
+Thus, `METALAKE_URL` can be set to:
+
+```
+http://localhost:8090/api/metalakes/test_metalake/catalogs/
+```
+
+You can then define the source as:
+
+```hocon
+source {
+    Jdbc {
+        url = 
"jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
+        driver = "${jdbc-driver}"
+        connection_check_timeout_sec = 100
+        sourceId = "test_catalog"
+        user = "${jdbc-user}"
+        password = "${jdbc-password}"
+        query = "select * from source"
+    }
+}
+```
+
+Here, `sourceId` refers to the catalog name, allowing other fields to use 
`${}` placeholders. At runtime, they will be automatically replaced. Note that 
in sinks, the same `sourceId` name is used, and placeholders must always start 
with `${` and end with `}`. Each item can contain at most one placeholder, and 
there can be content outside the placeholder as well.
\ No newline at end of file
diff --git a/docs/sidebars.js b/docs/sidebars.js
index cfd589ec11..c0640c044d 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -94,7 +94,8 @@ const sidebars = {
                 'concept/sql-config',
                 'concept/speed-limit',
                 'concept/event-listener',
-                'concept/schema-evolution'
+                'concept/schema-evolution',
+                'concept/metalake'
             ]
         },
         {
diff --git a/docs/zh/concept/metalake.md b/docs/zh/concept/metalake.md
new file mode 100644
index 0000000000..d3b52eb552
--- /dev/null
+++ b/docs/zh/concept/metalake.md
@@ -0,0 +1,69 @@
+# METALAKE
+
+由于Seatunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。
+
+因此引入了metalake,将数据源的信息存储于Apache 
Gravitino等metalake中,任务脚本采用`sourceId`和占位符的方法来代替原本的用户名和密码等信息,运行时seatunnel-engine通过http请求从metalake获取信息,根据占位符进行替换。
+
+若要使用metalake,首先要修改**seatunnel-env.sh**中的环境变量:
+
+* `METALAKE_ENABLED`
+* `METALAKE_TYPE`
+* `METALAKE_URL`
+
+将`METALAKE_ENABLED`设为`true`,`METALAKE_TYPE`当前仅支持设为`gravitino`。
+
+对于Apache Gravitino,`METALAKE_URL`设为
+
+```
+http://host:port/api/metalakes/your_metalake_name/catalogs/
+```
+
+---
+
+## 使用示例:
+
+用户需要先在Gravitino中创建catalog,如
+
+```bash
+curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs'
+-H 'Content-Type: application/json'
+-H 'Accept: application/vnd.gravitino.v1+json'
+-d '{
+    "name": "test_catalog",
+    "type": "relational",
+    "provider": "jdbc-mysql",
+    "comment": "for metalake test",
+    "properties": {
+        "jdbc-driver": "com.mysql.cj.jdbc.Driver",
+        "jdbc-url": "not used",
+        "jdbc-user": "root",
+        "jdbc-password": "Abc!@#135_seatunnel"
+    }
+}'
+```
+
+这样便在`test_metalake`中创建了一个`test_catalog`(`metalake`需要提前创建)
+
+于是`METALAKE_URL`可以设为
+
+```
+http://localhost:8090/api/metalakes/test_metalake/catalogs/
+```
+
+source可以写为
+
+```
+source {
+    Jdbc {
+        url = 
"jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
+        driver = "${jdbc-driver}"
+        connection_check_timeout_sec = 100
+        sourceId = "test_catalog"
+        user = "${jdbc-user}"
+        password = "${jdbc-password}"
+        query = "select * from source"
+    }
+}
+```
+
+其中`sourceId`指代catalog的名称,从而其他项可以使用`${}`占位符,运行时会自动替换。注意,在sink中使用时,同样叫`sourceId`;使用占位符时必须以`${`开头,以`}`结尾,每一项最多只能包含一个占位符,占位符以外也可以有内容
\ No newline at end of file
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index 2ca8eea31b..f7f813ab50 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -29,6 +29,11 @@
     <artifactId>seatunnel-api</artifactId>
     <name>SeaTunnel : Api</name>
 
+    <properties>
+        <httpclient.version>4.5.13</httpclient.version>
+        <httpcore.version>4.4.16</httpcore.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -41,5 +46,15 @@
             <version>${project.version}</version>
             <classifier>optional</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/GravitinoClient.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/GravitinoClient.java
new file mode 100644
index 0000000000..f479974988
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/GravitinoClient.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.metalake;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+
+public class GravitinoClient implements MetalakeClient {
+    private final String metalakeUrl;
+
+    public GravitinoClient(String metalakeUrl) {
+        this.metalakeUrl = metalakeUrl;
+    }
+
+    @Override
+    public String getType() {
+        return "gravitino";
+    }
+
+    @Override
+    public JsonNode getMetaInfo(String sourceId) throws IOException {
+        try (CloseableHttpClient client = HttpClients.createDefault()) {
+            HttpGet request = new HttpGet(this.metalakeUrl + sourceId);
+            request.addHeader("Accept", "application/vnd.gravitino.v1+json");
+            try (CloseableHttpResponse response = client.execute(request)) {
+                HttpEntity entity = response.getEntity();
+                if (entity == null) {
+                    throw new RuntimeException("No response entity");
+                }
+                JsonNode rootNode = JsonUtils.readTree(entity.getContent());
+                EntityUtils.consume(entity);
+                JsonNode catalogNode = rootNode.get("catalog");
+                if (catalogNode == null) {
+                    throw new RuntimeException("Response JSON has no 'catalog' 
field");
+                }
+                JsonNode propertiesNode = catalogNode.get("properties");
+                return propertiesNode;
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClient.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClient.java
new file mode 100644
index 0000000000..362e98cf8b
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClient.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.metalake;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+
+public interface MetalakeClient {
+    String getType();
+
+    JsonNode getMetaInfo(String sourceId) throws IOException;
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClientFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClientFactory.java
new file mode 100644
index 0000000000..e1d2edfd11
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeClientFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.metalake;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class MetalakeClientFactory {
+    private static final Map<String, Function<String, MetalakeClient>> 
REGISTRY = new HashMap<>();
+
+    static {
+        register("gravitino", GravitinoClient::new);
+    }
+
+    private MetalakeClientFactory() {}
+
+    public static void register(String type, Function<String, MetalakeClient> 
constructor) {
+        REGISTRY.put(type.toLowerCase(), constructor);
+    }
+
+    public static MetalakeClient create(String type, String metalakeUrl) {
+        Function<String, MetalakeClient> constructor = 
REGISTRY.get(type.toLowerCase());
+        if (constructor == null) {
+            throw new IllegalArgumentException("Unknown MetalakeClient type: " 
+ type);
+        }
+        return constructor.apply(metalakeUrl);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
new file mode 100644
index 0000000000..12f683cb89
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.metalake;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigList;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType;
+
+import org.apache.seatunnel.common.utils.PlaceholderUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class MetalakeConfigUtils {
+
+    public static Config getMetalakeConfig(Config jobConfigTmp) {
+        Config envConfig = jobConfigTmp.getConfig("env");
+        boolean metalakeEnabled =
+                envConfig.hasPath("metalake_enabled")
+                        ? envConfig.getBoolean("metalake_enabled")
+                        : Boolean.parseBoolean(
+                                
System.getenv().getOrDefault("METALAKE_ENABLED", "false"));
+        if (!metalakeEnabled) return jobConfigTmp;
+
+        Config update = jobConfigTmp;
+        String metalakeType =
+                envConfig.hasPath("metalake_type")
+                        ? envConfig.getString("metalake_type")
+                        : System.getenv("METALAKE_TYPE");
+        String metalakeUrl =
+                envConfig.hasPath("metalake_url")
+                        ? envConfig.getString("metalake_url")
+                        : System.getenv("METALAKE_URL");
+        MetalakeClient metalakeClient = 
MetalakeClientFactory.create(metalakeType, metalakeUrl);
+        update = replaceConfigList(update, "source", metalakeClient);
+        update = replaceConfigList(update, "sink", metalakeClient);
+        update = replaceConfigList(update, "transform", metalakeClient);
+        return update;
+    }
+
+    private static Config replaceConfigList(
+            Config updateConfig, String key, MetalakeClient metalakeClient) {
+        ConfigList list = updateConfig.getList(key);
+        List<ConfigValue> newConfigList = new ArrayList<>(list);
+
+        try {
+            for (int i = 0; i < list.size(); i++) {
+                ConfigObject Obj = (ConfigObject) list.get(i);
+                if (Obj.containsKey("sourceId")) {
+                    ConfigObject tmp = Obj;
+                    String sourceId = Obj.toConfig().getString("sourceId");
+                    JsonNode metalakeJson = 
metalakeClient.getMetaInfo(sourceId);
+                    for (Map.Entry<String, ConfigValue> entry : 
Obj.entrySet()) {
+                        String subKey = entry.getKey();
+                        ConfigValue value = entry.getValue();
+
+                        if (value.valueType() == ConfigValueType.STRING) {
+                            String strValue = (String) value.unwrapped();
+                            String newValue =
+                                    
PlaceholderUtils.replacePlaceholders(strValue, metalakeJson);
+                            tmp = tmp.withValue(subKey, 
ConfigValueFactory.fromAnyRef(newValue));
+                        }
+                    }
+                    newConfigList.set(i, tmp);
+                }
+            }
+        } catch (IOException e) {
+            log.error("Fail to get MetaInfo", e);
+        }
+        return updateConfig.withValue(key, 
ConfigValueFactory.fromIterable(newConfigList));
+    }
+}
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 5d765583ba..3d8e62684d 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -39,6 +39,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.datatype.jsr310.JavaTime
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -90,6 +91,10 @@ public class JsonUtils {
         return OBJECT_MAPPER.readTree(obj);
     }
 
+    public static JsonNode readTree(InputStream obj) throws IOException {
+        return OBJECT_MAPPER.readTree(obj);
+    }
+
     /**
      * json representation of object
      *
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
index ab697e7357..c14d1c3b47 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.common.utils;
 
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -48,4 +50,18 @@ public class PlaceholderUtils {
         matcher.appendTail(result);
         return result.toString();
     }
+
+    public static String replacePlaceholders(String input, JsonNode 
supportedValues) {
+        Pattern pattern = Pattern.compile("\\$\\{([^}]*)\\}");
+        Matcher matcher = pattern.matcher(input);
+        if (matcher.find()) {
+            String placeholder = matcher.group(1);
+
+            if (supportedValues.has(placeholder)) {
+                String replaced = supportedValues.get(placeholder).asText();
+                return replacePlaceholders(input, placeholder, replaced);
+            }
+        }
+        return input;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/pom.xml 
b/seatunnel-connectors-v2/connector-doris/pom.xml
index 06cd650f4a..31abc16318 100644
--- a/seatunnel-connectors-v2/connector-doris/pom.xml
+++ b/seatunnel-connectors-v2/connector-doris/pom.xml
@@ -31,7 +31,7 @@
 
     <properties>
         <httpclient.version>4.5.13</httpclient.version>
-        <httpcore.version>4.4.4</httpcore.version>
+        <httpcore.version>4.4.16</httpcore.version>
     </properties>
 
     <dependencies>
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml 
b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index e19e9ab984..1e3882017c 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -31,7 +31,7 @@
 
     <properties>
         <httpclient.version>4.5.13</httpclient.version>
-        <httpcore.version>4.4.4</httpcore.version>
+        <httpcore.version>4.4.16</httpcore.version>
         <guava-retrying.version>2.0.0</guava-retrying.version>
         <mockito.version>3.12.4</mockito.version>
     </properties>
diff --git a/seatunnel-connectors-v2/connector-selectdb-cloud/pom.xml 
b/seatunnel-connectors-v2/connector-selectdb-cloud/pom.xml
index 2750ddaa33..908f365d50 100644
--- a/seatunnel-connectors-v2/connector-selectdb-cloud/pom.xml
+++ b/seatunnel-connectors-v2/connector-selectdb-cloud/pom.xml
@@ -31,7 +31,7 @@
 
     <properties>
         <httpclient.version>4.5.13</httpclient.version>
-        <httpcore.version>4.4.4</httpcore.version>
+        <httpcore.version>4.4.16</httpcore.version>
     </properties>
 
     <dependencies>
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml 
b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 560c26ca13..52dd650969 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -32,7 +32,7 @@
     <properties>
         <connector.name>connector.starrocks</connector.name>
         <httpclient.version>4.5.13</httpclient.version>
-        <httpcore.version>4.4.4</httpcore.version>
+        <httpcore.version>4.4.16</httpcore.version>
         <mysql.version>8.0.16</mysql.version>
         <starrocks.thrift.sdk.version>1.0.1</starrocks.thrift.sdk.version>
         <arrow.version>5.0.0</arrow.version>
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
index e831fb081b..b251e381ab 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
+import org.apache.seatunnel.api.metalake.MetalakeConfigUtils;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
@@ -48,7 +49,9 @@ public class FlinkTaskExecuteCommand implements 
Command<FlinkCommandArgs> {
     public void execute() throws CommandExecuteException {
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
         checkConfigExist(configFile);
-        Config config = ConfigBuilder.of(configFile, 
flinkCommandArgs.getVariables());
+        Config config =
+                MetalakeConfigUtils.getMetalakeConfig(
+                        ConfigBuilder.of(configFile, 
flinkCommandArgs.getVariables()));
         // if user specified job name using command line arguments, override 
config option
         if (!flinkCommandArgs.getJobName().equals(Constants.LOGO)) {
             config =
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
index ea36bc0777..67fef2ac4c 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
+import org.apache.seatunnel.api.metalake.MetalakeConfigUtils;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
@@ -48,7 +49,9 @@ public class SparkTaskExecuteCommand implements 
Command<SparkCommandArgs> {
     public void execute() throws CommandExecuteException {
         Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
         checkConfigExist(configFile);
-        Config config = ConfigBuilder.of(configFile, 
sparkCommandArgs.getVariables());
+        Config config =
+                MetalakeConfigUtils.getMetalakeConfig(
+                        ConfigBuilder.of(configFile, 
sparkCommandArgs.getVariables()));
         if (!sparkCommandArgs.getJobName().equals(Constants.LOGO)) {
             config =
                     config.withValue(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
new file mode 100644
index 0000000000..45ff2538b4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.github.dockerjava.api.DockerClient;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.awaitility.Awaitility.given;
+
+public class MetalakeIT extends SeaTunnelContainer {
+
+    protected GenericContainer<?> dbServer;
+
+    protected JdbcCase jdbcCase;
+
+    protected Connection connection;
+
+    protected Catalog catalog;
+
+    protected DockerClient dockerClient = DockerClientFactory.lazyClient();
+
+    protected static final String HOST = "HOST";
+
+    private static final String MYSQL_IMAGE = "mysql:8.0";
+    private static final String MYSQL_CONTAINER_HOST = "mysql-e2e";
+    private static final String MYSQL_DATABASE = "seatunnel";
+    private static final String MYSQL_SOURCE = "source";
+    private static final String MYSQL_SINK = "sink";
+    private static final String CATALOG_DATABASE = "catalog_database";
+
+    private static final String MYSQL_USERNAME = "root";
+    private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel";
+    private static final int MYSQL_PORT = 3306;
+    private static final String MYSQL_URL = "jdbc:mysql://" + HOST + 
":%s/%s?useSSL=false";
+    private static final String URL = "jdbc:mysql://" + HOST + 
":3306/seatunnel";
+
+    private static final String SQL = "select * from seatunnel.source";
+
+    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/mysql_to_mysql_with_metalake.conf");
+    private static final String CREATE_SQL =
+            "CREATE TABLE IF NOT EXISTS %s\n"
+                    + "(\n"
+                    + "    `c-bit_1`                bit(1)                
DEFAULT NULL,\n"
+                    + "    `c_bit_8`                bit(8)                
DEFAULT NULL,\n"
+                    + "    `c_bit_16`               bit(16)               
DEFAULT NULL,\n"
+                    + "    `c_bit_32`               bit(32)               
DEFAULT NULL,\n"
+                    + "    `c_bit_64`               bit(64)               
DEFAULT NULL,\n"
+                    + "    `c_bigint_30`            BIGINT(40)  unsigned  
DEFAULT NULL,\n"
+                    + "    UNIQUE (c_bigint_30)\n"
+                    + ");";
+
+    @BeforeEach
+    @Override
+    public void startUp() throws Exception {
+        // super.startUp();
+        server =
+                new GenericContainer<>(getDockerImage())
+                        .withNetwork(NETWORK)
+                        .withEnv("TZ", "UTC")
+                        .withEnv("METALAKE_ENABLED", "true")
+                        .withEnv("METALAKE_TYPE", "gravitino")
+                        .withEnv(
+                                "METALAKE_URL",
+                                
"http://127.0.0.1:8090/api/metalakes/test_metalake/catalogs/";)
+                        .withCommand(buildStartCommand())
+                        .withNetworkAliases("server")
+                        .withExposedPorts()
+                        .withFileSystemBind("/tmp", "/opt/hive")
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
+                        .waitingFor(Wait.forLogMessage(".*received new worker 
register:.*", 1));
+        copySeaTunnelStarterToContainer(server);
+        server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+                Paths.get(SEATUNNEL_HOME, "config").toString());
+
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+                Paths.get(SEATUNNEL_HOME, 
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
+        // execute extra commands
+        executeExtraCommands(server);
+        server.start();
+
+        server.execInContainer(
+                "bash",
+                "-c",
+                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && wget "
+                        + driverUrl()
+                        + " --no-check-certificate"
+                        + "&& mkdir -p /tmp/gravitino && cd /tmp/gravitino && 
curl -C - --retry 5 -L -k -o gravitino-0.9.1-bin.tar.gz 
https://dlcdn.apache.org/gravitino/0.9.1/gravitino-0.9.1-bin.tar.gz && tar 
-zxvf gravitino-0.9.1-bin.tar.gz && cd /tmp/gravitino/gravitino-0.9.1-bin && 
./bin/gravitino.sh start");
+
+        server.execInContainer(
+                "bash",
+                "-c",
+                "sleep 60 && curl -L 'http://127.0.0.1:8090/api/metalakes' -H 
'Content-Type: application/json' -H 'Accept: application/vnd.gravitino.v1+json' 
-d '{\"name\":\"test_metalake\",\"comment\":\"for metalake 
test\",\"properties\":{}}'"
+                        + "&& curl -L 
'http://127.0.0.1:8090/api/metalakes/test_metalake/catalogs' -H 'Content-Type: 
application/json' -H 'Accept: application/vnd.gravitino.v1+json' -d 
'{\"name\":\"test_catalog\",\"type\":\"relational\",\"provider\":\"jdbc-mysql\",\"comment\":\"for
 metalake 
test\",\"properties\":{\"jdbc-driver\":\"com.mysql.cj.jdbc.Driver\",\"jdbc-url\":\"not
 used\",\"jdbc-user\":\"root\",\"jdbc-password\":\"Abc!@#135_seatunnel\"}}'");
+
+        dbServer = 
initContainer().withImagePullPolicy(PullPolicy.alwaysPull());
+
+        Startables.deepStart(Stream.of(dbServer)).join();
+
+        jdbcCase = getJdbcCase();
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(360, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+
+        createNeededTables();
+        insertTestData();
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+        }
+
+        if (connection != null) {
+            connection.close();
+        }
+
+        if (dbServer != null) {
+            dbServer.close();
+            try {
+                
dockerClient.removeImageCmd(dbServer.getDockerImageName()).exec();
+            } catch (Exception ignored) {
+                ignored.printStackTrace();
+            }
+        }
+
+        super.tearDown();
+    }
+
+    @Test
+    public void TestMetalake() throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
executeJob("/jdbc_mysql_source_to_assert_sink_with_metalake.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+    }
+
+    protected GenericContainer<?> initContainer() {
+        DockerImageName imageName = DockerImageName.parse(MYSQL_IMAGE);
+
+        GenericContainer<?> container =
+                new MySQLContainer<>(imageName)
+                        .withUsername(MYSQL_USERNAME)
+                        .withPassword(MYSQL_PASSWORD)
+                        .withDatabaseName(MYSQL_DATABASE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_CONTAINER_HOST)
+                        .withExposedPorts(MYSQL_PORT)
+                        .waitingFor(Wait.forHealthcheck())
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MYSQL_IMAGE)));
+
+        container.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", MYSQL_PORT, 
MYSQL_PORT)));
+
+        return container;
+    }
+
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(MYSQL_URL, MYSQL_PORT, MYSQL_DATABASE);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(MYSQL_DATABASE, MYSQL_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(MYSQL_IMAGE)
+                .networkAliases(MYSQL_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(MYSQL_PORT)
+                .localPort(MYSQL_PORT)
+                .jdbcTemplate(MYSQL_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(MYSQL_USERNAME)
+                .password(MYSQL_PASSWORD)
+                .database(MYSQL_DATABASE)
+                .sourceTable(MYSQL_SOURCE)
+                .sinkTable(MYSQL_SINK)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .catalogDatabase(CATALOG_DATABASE)
+                .catalogTable(MYSQL_SINK)
+                .tablePathFullName(MYSQL_DATABASE + "." + MYSQL_SOURCE)
+                .build();
+    }
+
+    protected void initializeJdbcConnection(String jdbcUrl)
+            throws SQLException, InstantiationException, 
IllegalAccessException {
+        Driver driver = (Driver) loadDriverClass().newInstance();
+        Properties props = new Properties();
+
+        if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+            props.put("user", jdbcCase.getUserName());
+        }
+
+        if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+            props.put("password", jdbcCase.getPassword());
+        }
+
+        if (dbServer != null) {
+            jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+        }
+
+        this.connection = driver.connect(jdbcUrl, props);
+        connection.setAutoCommit(false);
+    }
+
+    protected void createNeededTables() {
+        try (Statement statement = connection.createStatement()) {
+            String createTemplate = jdbcCase.getCreateSql();
+
+            String createSource =
+                    String.format(
+                            createTemplate,
+                            buildTableInfoWithSchema(
+                                    jdbcCase.getDatabase(),
+                                    jdbcCase.getSchema(),
+                                    jdbcCase.getSourceTable()));
+            statement.execute(createSource);
+
+            if (jdbcCase.getAdditionalSqlOnSource() != null) {
+                String additionalSql =
+                        String.format(
+                                jdbcCase.getAdditionalSqlOnSource(),
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSourceTable()));
+                statement.execute(additionalSql);
+            }
+
+            if (!jdbcCase.isUseSaveModeCreateTable()) {
+                if (jdbcCase.getSinkCreateSql() != null) {
+                    createTemplate = jdbcCase.getSinkCreateSql();
+                }
+                String createSink =
+                        String.format(
+                                createTemplate,
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSinkTable()));
+                statement.execute(createSink);
+            }
+
+            if (jdbcCase.getAdditionalSqlOnSink() != null) {
+                String additionalSql =
+                        String.format(
+                                jdbcCase.getAdditionalSqlOnSink(),
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSinkTable()));
+                statement.execute(additionalSql);
+            }
+
+            connection.commit();
+        } catch (Exception exception) {
+            exception.printStackTrace();
+        }
+    }
+
+    protected void insertTestData() {
+        try (PreparedStatement preparedStatement =
+                connection.prepareStatement(jdbcCase.getInsertSql())) {
+
+            List<SeaTunnelRow> rows = jdbcCase.getTestData().getValue();
+
+            for (SeaTunnelRow row : rows) {
+                for (int index = 0; index < row.getArity(); index++) {
+                    preparedStatement.setObject(index + 1, 
row.getField(index));
+                }
+                preparedStatement.addBatch();
+            }
+
+            preparedStatement.executeBatch();
+
+            connection.commit();
+        } catch (Exception exception) {
+            exception.printStackTrace();
+        }
+    }
+
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames =
+                new String[] {
+                    "c-bit_1", "c_bit_8", "c_bit_16", "c_bit_32", "c_bit_64", 
"c_bigint_30",
+                };
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+        BigDecimal decimalValue = new 
BigDecimal("999999999999999999999999999899");
+        for (int i = 0; i < 100; i++) {
+            byte byteArr = Integer.valueOf(i).byteValue();
+            SeaTunnelRow row;
+            if (i == 99) {
+                row =
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    (byte) 0,
+                                    new byte[] {byteArr},
+                                    new byte[] {byteArr, byteArr},
+                                    new byte[] {byteArr, byteArr, byteArr, 
byteArr},
+                                    new byte[] {
+                                        byteArr, byteArr, byteArr, byteArr, 
byteArr, byteArr,
+                                        byteArr, byteArr
+                                    },
+                                    // 
https://github.com/apache/seatunnel/issues/5559 this value
+                                    // cannot set null, this null
+                                    // value column's row will be lost in
+                                    // 
jdbc_mysql_source_and_sink_parallel.conf,jdbc_mysql_source_and_sink_parallel_upper_lower.conf.
+                                    bigintValue.add(BigDecimal.valueOf(i)),
+                                });
+            } else {
+                row =
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    i % 2 == 0 ? (byte) 1 : (byte) 0,
+                                    new byte[] {byteArr},
+                                    new byte[] {byteArr, byteArr},
+                                    new byte[] {byteArr, byteArr, byteArr, 
byteArr},
+                                    new byte[] {
+                                        byteArr, byteArr, byteArr, byteArr, 
byteArr, byteArr,
+                                        byteArr, byteArr
+                                    },
+                                    bigintValue.add(BigDecimal.valueOf(i)),
+                                });
+            }
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    public String insertTable(String schema, String table, String... fields) {
+        String columns =
+                
Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", 
"));
+        String placeholders = Arrays.stream(fields).map(f -> 
"?").collect(Collectors.joining(", "));
+
+        return "INSERT INTO "
+                + buildTableInfoWithSchema(schema, table)
+                + " ("
+                + columns
+                + " )"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    protected Class<?> loadDriverClass() {
+        try {
+            return Class.forName(jdbcCase.getDriverClass());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to load driver class: " + 
jdbcCase.getDriverClass(), e);
+        }
+    }
+
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(database, table);
+    }
+
+    public String buildTableInfoWithSchema(String schema, String table) {
+        if (StringUtils.isNotBlank(schema)) {
+            return quoteIdentifier(schema) + "." + quoteIdentifier(table);
+        } else {
+            return quoteIdentifier(table);
+        }
+    }
+
+    public String quoteIdentifier(String field) {
+        return "`" + field + "`";
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_to_assert_sink_with_metalake.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_to_assert_sink_with_metalake.conf
new file mode 100644
index 0000000000..0038bbd9dd
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_to_assert_sink_with_metalake.conf
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+      url = 
"jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
+      driver = "com.mysql.cj.jdbc.Driver"
+      connection_check_timeout_sec = 100
+      sourceId = "test_catalog"
+      user = "${jdbc-user}"
+      password = "${jdbc-password}"
+      query = "select * from source"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+      rules =
+        {
+          row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 101
+            },
+            {
+              rule_type = MIN_ROW
+              rule_value = 99
+            }
+          ],
+          field_rules = [
+          {
+            field_name = c_bit_8
+            field_type = bytes
+            field_value = [
+              {
+                  rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_bit_16
+            field_type = bytes
+            field_value = [
+              {
+                  rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_bit_32
+            field_type = bytes
+            field_value = [
+              {
+                  rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_bit_64
+            field_type = bytes
+            field_value = [
+              {
+                  rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_bigint_30
+            field_type = "decimal(20,0)"
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          ]
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b5b27db163..c815a03176 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.metalake.MetalakeConfigUtils;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.options.EnvCommonOptions;
 import org.apache.seatunnel.api.options.EnvOptionRule;
@@ -172,7 +173,9 @@ public class MultipleTableJobConfigParser {
         this.jobConfig = jobConfig;
         this.commonPluginJars = commonPluginJars;
         this.isStartWithSavePoint = isStartWithSavePoint;
-        this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
+        this.seaTunnelJobConfig =
+                MetalakeConfigUtils.getMetalakeConfig(
+                        ConfigBuilder.of(Paths.get(jobDefineFilePath), 
variables));
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.pipelineCheckpoints = pipelineCheckpoints;
         ConfigValidator.of(this.envOptions).validate(new 
EnvOptionRule().optionRule());

Reply via email to